diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..cecf1d03418d7 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,8 +391,13 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec>) -> Self { - let Some(projection) = projection else { + pub fn project(self, projection: Option<&impl AsRef<[usize]>>) -> Self { + let projection = projection.map(AsRef::as_ref); + self.project_impl(projection) + } + + fn project_impl(mut self, projection: Option<&[usize]>) -> Self { + let Some(projection) = projection.map(AsRef::as_ref) else { return self; }; @@ -410,7 +415,7 @@ impl Statistics { .map(Slot::Present) .collect(); - for idx in projection { + for idx in projection.iter() { let next_idx = self.column_statistics.len(); let slot = std::mem::replace( columns.get_mut(*idx).expect("projection out of bounds"), @@ -1066,7 +1071,7 @@ mod tests { #[test] fn test_project_none() { - let projection = None; + let projection: Option> = None; let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); assert_eq!(stats, make_stats(vec![10, 20, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bde193..016b188e3d6b8 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -70,10 +70,10 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&Vec>, + projection: Option<&impl AsRef<[usize]>>, ) -> Result { let schema = match projection { - Some(columns) => Arc::new(schema.project(columns)?), + Some(columns) => Arc::new(schema.project(columns.as_ref())?), None => Arc::clone(schema), }; Ok(schema) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b1aa850284aee..6765b7f79fdd2 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1007,7 +1007,7 @@ impl DefaultPhysicalPlanner { // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. .apply_projection(Some( - (0..input.schema().fields().len()).collect(), + (0..input.schema().fields().len()).collect::>(), ))? .with_batch_size(session_state.config().batch_size()) .build()? diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a95591baa..b640159ca8463 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!(swapped_join.projection.as_deref().unwrap(), &[0_usize]); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92ce..ae83a74627bed 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -29,7 +29,8 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::stats::{ColumnStatistics, Precision}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ - Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err, + Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err, + plan_err, }; use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet; @@ -125,7 +126,8 @@ impl From for (Arc, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +139,16 @@ impl std::fmt::Display for ProjectionExprs { impl From> for ProjectionExprs { fn from(value: Vec) -> Self { - Self { exprs: value } + Self { + exprs: value.into(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +156,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } } @@ -164,12 +168,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { } impl ProjectionExprs { - pub fn new(exprs: I) -> Self - where - I: IntoIterator, - { + /// Make a new [`ProjectionExprs`] from expressions iterator. + pub fn new(exprs: impl IntoIterator) -> Self { + Self { + exprs: exprs.into_iter().collect(), + } + } + + /// Make a new [`ProjectionExprs`] from expressions. + pub fn from_expressions(exprs: impl Into>) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into(), } } @@ -285,13 +294,14 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() + .iter() + .cloned() .map(|mut proj| { proj.expr = f(proj.expr)?; Ok(proj) }) - .collect::>>()?; - Ok(Self::new(exprs)) + .collect::>>()?; + Ok(Self::from_expressions(exprs)) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +371,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -602,12 +612,12 @@ impl ProjectionExprs { /// ``` pub fn project_statistics( &self, - mut stats: datafusion_common::Statistics, + mut stats: Statistics, output_schema: &Schema, - ) -> Result { + ) -> Result { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,13 +764,52 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter; +/// Describes an immutable reference counted projection. +/// +/// This structure represents projecting a set of columns by index. +/// [`Arc`] is used to make it cheap to clone. +pub type ProjectionRef = Arc<[usize]>; - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() - } +/// Combine two projections. +/// +/// If `p1` is [`None`] then there are no changes. +/// Otherwise, if passed `p2` is not [`None`] then it is remapped +/// according to the `p1`. Otherwise, there are no changes. +/// +/// # Example +/// +/// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`, +/// then the resulting projection will be [0, 3]. +/// +/// # Error +/// +/// Returns an internal error if `p1` contains index that is greater than `p2` len. +/// +pub fn combine_projections( + p1: Option<&ProjectionRef>, + p2: Option<&ProjectionRef>, +) -> Result> { + let Some(p1) = p1 else { + return Ok(None); + }; + let Some(p2) = p2 else { + return Ok(Some(Arc::clone(p1))); + }; + + Ok(Some( + p1.iter() + .map(|i| { + let idx = *i; + assert_or_internal_err!( + idx < p2.len(), + "unable to apply projection: index {} is greater than new projection len {}", + idx, + p2.len(), + ); + Ok(p2[*i]) + }) + .collect::>>()?, + )) } /// The function operates in two modes: diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index acb1c588097ee..790669b5c9dbf 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -49,7 +49,7 @@ use datafusion_physical_plan::aggregates::{ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec, + CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode, SortMergeJoinExec, }; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -305,18 +305,19 @@ pub fn adjust_input_keys_ordering( Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec, )| { - HashJoinExec::try_new( + HashJoinExecBuilder::new( Arc::clone(left), Arc::clone(right), new_conditions.0, - filter.clone(), - join_type, - // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter. - projection.clone(), - PartitionMode::Partitioned, - *null_equality, - *null_aware, + *join_type, ) + .with_filter(filter.clone()) + // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter. + .with_projection_ref(projection.clone()) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(*null_equality) + .with_null_aware(*null_aware) + .build() .map(|e| Arc::new(e) as _) }; return reorder_partitioned_join_keys( @@ -638,17 +639,20 @@ pub fn reorder_join_keys_to_inputs( right_keys, } = join_keys; let new_join_on = new_join_conditions(&left_keys, &right_keys); - return Ok(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - new_join_on, - filter.clone(), - join_type, - projection.clone(), - PartitionMode::Partitioned, - *null_equality, - *null_aware, - )?)); + return Ok(Arc::new( + HashJoinExecBuilder::new( + Arc::clone(left), + Arc::clone(right), + new_join_on, + *join_type, + ) + .with_filter(filter.clone()) + .with_projection_ref(projection.clone()) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(*null_equality) + .with_null_aware(*null_aware) + .build()?, + )); } } } else if let Some(SortMergeJoinExec { diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 698fdea8e766e..2dc61ba2453fb 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -723,7 +723,7 @@ fn handle_hash_join( .collect(); let column_indices = build_join_column_index(plan); - let projected_indices: Vec<_> = if let Some(projection) = &plan.projection { + let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() { projection.iter().map(|&i| &column_indices[i]).collect() } else { column_indices.iter().collect() diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 7412d0ba97812..02ef378d704a0 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::joins::utils::ColumnIndex; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, + CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, SymmetricHashJoinExec, }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -191,30 +191,18 @@ pub(crate) fn try_collect_left( { Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?)) } else { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - PartitionMode::CollectLeft, - hash_join.null_equality(), - hash_join.null_aware, - )?))) + Ok(Some(Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_partition_mode(PartitionMode::CollectLeft) + .build()?, + ))) } } - (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - PartitionMode::CollectLeft, - hash_join.null_equality(), - hash_join.null_aware, - )?))), + (true, false) => Ok(Some(Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_partition_mode(PartitionMode::CollectLeft) + .build()?, + ))), (false, true) => { // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() && !hash_join.null_aware { @@ -254,17 +242,11 @@ pub(crate) fn partitioned_hash_join( PartitionMode::Partitioned }; - Ok(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - partition_mode, - hash_join.null_equality(), - hash_join.null_aware, - )?)) + Ok(Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_partition_mode(partition_mode) + .build()?, + )) } } diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 99922ba075cc0..44d0926a8b250 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -135,7 +135,7 @@ fn try_push_down_join_filter( ); let new_lhs_length = lhs_rewrite.data.0.schema().fields.len(); - let projections = match projections { + let projections = match projections.as_ref() { None => match join.join_type() { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { // Build projections that ignore the newly projected columns. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index aa0f5a236cac7..dcfa0456ac525 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -626,11 +626,14 @@ pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, /// Group by expressions - group_by: PhysicalGroupBy, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + group_by: Arc, /// Aggregate expressions - aggr_expr: Vec>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + aggr_expr: Arc<[Arc]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec>>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + filter_expr: Arc<[Option>]>, /// Configuration for limit-based optimizations limit_options: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -664,18 +667,18 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec>, + aggr_expr: impl Into]>>, ) -> Self { Self { - aggr_expr, + aggr_expr: aggr_expr.into(), // clone the rest of the fields required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + filter_expr: Arc::clone(&self.filter_expr), limit_options: self.limit_options, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -694,9 +697,9 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -711,12 +714,13 @@ impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); @@ -741,13 +745,16 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, mut aggr_expr: Vec>, - filter_expr: Vec>>, + filter_expr: impl Into>]>>, input: Arc, input_schema: SchemaRef, schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); + let filter_expr = filter_expr.into(); + // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -814,13 +821,13 @@ impl AggregateExec { &group_expr_mapping, &mode, &input_order_mode, - aggr_expr.as_slice(), + aggr_expr.as_ref(), )?; let mut exec = AggregateExec { mode, group_by, - aggr_expr, + aggr_expr: aggr_expr.into(), filter_expr, input, schema, @@ -1370,9 +1377,9 @@ impl ExecutionPlan for AggregateExec { ) -> Result> { let mut me = AggregateExec::try_new_with_schema( self.mode, - self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), + Arc::clone(&self.group_by), + self.aggr_expr.to_vec(), + Arc::clone(&self.filter_expr), Arc::clone(&children[0]), Arc::clone(&self.input_schema), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index eb9b6766ab8ee..fe8942097ac93 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -62,7 +62,7 @@ struct AggregateStreamInner { mode: AggregateMode, input: SendableRecordBatchStream, aggregate_expressions: Vec>>, - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, // ==== Runtime States/Buffers ==== accumulators: Vec, @@ -277,7 +277,7 @@ impl AggregateStream { partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -285,7 +285,7 @@ impl AggregateStream { let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; let filter_expressions = match agg.mode.input_mode() { AggregateInputMode::Raw => agg_filter_expr, - AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()], + AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()].into(), }; let accumulators = create_accumulators(&agg.aggr_expr)?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index b2cf396b1500d..de857370ce285 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream { /// /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, /// the filter expression is `x > 100`. - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, /// GROUP BY expressions - group_by: PhysicalGroupBy, + group_by: Arc, /// max rows in output RecordBatches batch_size: usize, @@ -465,8 +465,8 @@ impl GroupedHashAggregateStream { ) -> Result { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); - let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_group_by = Arc::clone(&agg.group_by); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -493,7 +493,7 @@ impl GroupedHashAggregateStream { let filter_expressions = match agg.mode.input_mode() { AggregateInputMode::Raw => agg_filter_expr, - AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()], + AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()].into(), }; // Instantiate the accumulators diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 72c5d0c86745d..4aa566ccfcd0a 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -50,7 +50,7 @@ pub struct GroupedTopKAggregateStream { baseline_metrics: BaselineMetrics, group_by_metrics: GroupByMetrics, aggregate_arguments: Vec>>, - group_by: PhysicalGroupBy, + group_by: Arc, priority_map: PriorityMap, } @@ -62,7 +62,7 @@ impl GroupedTopKAggregateStream { limit: usize, ) -> Result { let agg_schema = Arc::clone(&aggr.schema); - let group_by = aggr.group_by.clone(); + let group_by = Arc::clone(&aggr.group_by); let input = aggr.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56ad48..590f6f09e8b9e 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0acf419e67f90..abd7b72fc963c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; +use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use itertools::Itertools; use super::{ @@ -85,7 +86,7 @@ pub struct FilterExec { /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, /// The projection indices of the columns in the output schema of join - projection: Option>, + projection: Option, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -96,7 +97,7 @@ pub struct FilterExec { pub struct FilterExecBuilder { predicate: Arc, input: Arc, - projection: Option>, + projection: Option, default_selectivity: u8, batch_size: usize, fetch: Option, @@ -136,18 +137,19 @@ impl FilterExecBuilder { /// /// If no projection is currently set, the new projection is used directly. /// If `None` is passed, the projection is cleared. - pub fn apply_projection(mut self, projection: Option>) -> Result { + pub fn apply_projection(self, projection: Option>) -> Result { + let projection = projection.map(Into::into); + self.apply_projection_by_ref(projection.as_ref()) + } + + /// The same as [`Self::apply_projection`] but takes projection shared reference. + pub fn apply_projection_by_ref( + mut self, + projection: Option<&ProjectionRef>, + ) -> Result { // Check if the projection is valid against current output schema - can_project(&self.input.schema(), projection.as_ref())?; - self.projection = match projection { - Some(new_proj) => match &self.projection { - Some(existing_proj) => { - Some(new_proj.iter().map(|i| existing_proj[*i]).collect()) - } - None => Some(new_proj), - }, - None => None, - }; + can_project(&self.input.schema(), projection.map(AsRef::as_ref))?; + self.projection = combine_projections(projection, self.projection.as_ref())?; Ok(self) } @@ -189,16 +191,14 @@ impl FilterExecBuilder { } // Validate projection if provided - if let Some(ref proj) = self.projection { - can_project(&self.input.schema(), Some(proj))?; - } + can_project(&self.input.schema(), self.projection.as_deref())?; // Compute properties once with all parameters let cache = FilterExec::compute_properties( &self.input, &self.predicate, self.default_selectivity, - self.projection.as_ref(), + self.projection.as_deref(), )?; Ok(FilterExec { @@ -302,8 +302,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &Option { + &self.projection } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -380,7 +380,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -419,7 +419,7 @@ impl FilterExec { if let Some(projection) = projection { let schema = eq_properties.schema(); let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; + let out_schema = project_schema(schema, Some(&projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -664,7 +664,7 @@ impl ExecutionPlan for FilterExec { let new_predicate = conjunction(unhandled_filters); let updated_node = if new_predicate.eq(&lit(true)) { // FilterExec is no longer needed, but we may need to leave a projection in place - match self.projection() { + match self.projection().as_ref() { Some(projection_indices) => { let filter_child_schema = filter_input.schema(); let proj_exprs = projection_indices @@ -700,7 +700,7 @@ impl ExecutionPlan for FilterExec { &filter_input, &new_predicate, self.default_selectivity, - self.projection.as_ref(), + self.projection.as_deref(), )?, projection: self.projection.clone(), batch_size: self.batch_size, @@ -812,7 +812,7 @@ struct FilterExecStream { /// Runtime metrics recording metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema - projection: Option>, + projection: Option, /// Batch coalescer to combine small batches batch_coalescer: LimitedBatchCoalescer, } @@ -903,8 +903,8 @@ impl Stream for FilterExecStream { .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match self.projection { - Some(ref projection) => { + Ok(match self.projection.as_ref() { + Some(projection) => { let projected_batch = batch.project(projection)?; (array, projected_batch) }, @@ -1725,7 +1725,7 @@ mod tests { .build()?; // Verify projection is set correctly - assert_eq!(filter.projection(), Some(&vec![0, 2])); + assert_eq!(filter.projection(), &Some([0, 2].into())); // Verify schema contains only projected columns let output_schema = filter.schema(); @@ -1755,7 +1755,7 @@ mod tests { let filter = FilterExecBuilder::new(predicate, input).build()?; // Verify no projection is set - assert_eq!(filter.projection(), None); + assert!(filter.projection().is_none()); // Verify schema contains all columns let output_schema = filter.schema(); @@ -1969,7 +1969,7 @@ mod tests { .build()?; // Verify composed projection is [0, 3] - assert_eq!(filter.projection(), Some(&vec![0, 3])); + assert_eq!(filter.projection(), &Some([0, 3].into())); // Verify schema contains only columns a and d let output_schema = filter.schema(); @@ -2003,7 +2003,7 @@ mod tests { .build()?; // Projection should be cleared - assert_eq!(filter.projection(), None); + assert_eq!(filter.projection(), &None); // Schema should have all columns let output_schema = filter.schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c249dfb10aacf..a330ad54cb338 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -81,6 +81,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; @@ -246,6 +247,172 @@ impl JoinLeftData { } } +/// Helps to build [`HashJoinExec`]. +pub struct HashJoinExecBuilder { + left: Arc, + right: Arc, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + join_type: JoinType, + filter: Option, + projection: Option, + partition_mode: PartitionMode, + null_equality: NullEquality, + null_aware: bool, +} + +impl HashJoinExecBuilder { + /// Make a new [`HashJoinExecBuilder`]. + pub fn new( + left: Arc, + right: Arc, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + join_type: JoinType, + ) -> Self { + Self { + left, + right, + on, + filter: None, + projection: None, + partition_mode: PartitionMode::Auto, + join_type, + null_equality: NullEquality::NullEqualsNothing, + null_aware: false, + } + } + + /// Set projection from the vector. + pub fn with_projection(self, projection: Option>) -> Self { + self.with_projection_ref(projection.map(Into::into)) + } + + /// Set projection from the shared reference. + pub fn with_projection_ref(mut self, projection: Option) -> Self { + self.projection = projection; + self + } + + /// Set optional filter. + pub fn with_filter(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + /// Set partition mode. + pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self { + self.partition_mode = mode; + self + } + + /// Set null equality property. + pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self { + self.null_equality = null_equality; + self + } + + /// Set null aware property. + pub fn with_null_aware(mut self, null_aware: bool) -> Self { + self.null_aware = null_aware; + self + } + + /// Build resulting execution plan. + pub fn build(self) -> Result { + let Self { + left, + right, + on, + join_type, + filter, + projection, + partition_mode, + null_equality, + null_aware, + } = self; + + let left_schema = left.schema(); + let right_schema = right.schema(); + if on.is_empty() { + return plan_err!("On constraints in HashJoinExec should be non-empty"); + } + + check_join_is_valid(&left_schema, &right_schema, &on)?; + + // Validate null_aware flag + if null_aware { + if !matches!(join_type, JoinType::LeftAnti) { + return plan_err!( + "null_aware can only be true for LeftAnti joins, got {join_type}" + ); + } + if on.len() != 1 { + return plan_err!( + "null_aware anti join only supports single column join key, got {} columns", + on.len() + ); + } + } + + let (join_schema, column_indices) = + build_join_schema(&left_schema, &right_schema, &join_type); + + let random_state = HASH_JOIN_SEED; + + let join_schema = Arc::new(join_schema); + + // check if the projection is valid + can_project(&join_schema, projection.as_deref())?; + + let cache = HashJoinExec::compute_properties( + &left, + &right, + &join_schema, + join_type, + &on, + partition_mode, + projection.as_deref(), + )?; + + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled + + Ok(HashJoinExec { + left, + right, + on, + filter, + join_type, + join_schema, + left_fut: Default::default(), + random_state, + mode: partition_mode, + metrics: ExecutionPlanMetricsSet::new(), + projection, + column_indices, + null_equality, + null_aware, + cache, + dynamic_filter: None, + }) + } +} + +impl From<&HashJoinExec> for HashJoinExecBuilder { + fn from(exec: &HashJoinExec) -> Self { + Self { + left: Arc::clone(exec.left()), + right: Arc::clone(exec.right()), + on: exec.on.clone(), + join_type: exec.join_type, + filter: exec.filter.clone(), + projection: exec.projection.clone(), + partition_mode: exec.mode, + null_equality: exec.null_equality, + null_aware: exec.null_aware, + } + } +} + #[expect(rustdoc::private_intra_doc_links)] /// Join execution plan: Evaluates equijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post @@ -466,7 +633,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option>, + pub projection: Option, /// Information of index and left / right placement of columns column_indices: Vec, /// The equality null-handling behavior of the join algorithm. @@ -535,70 +702,13 @@ impl HashJoinExec { null_equality: NullEquality, null_aware: bool, ) -> Result { - let left_schema = left.schema(); - let right_schema = right.schema(); - if on.is_empty() { - return plan_err!("On constraints in HashJoinExec should be non-empty"); - } - - check_join_is_valid(&left_schema, &right_schema, &on)?; - - // Validate null_aware flag - if null_aware { - if !matches!(join_type, JoinType::LeftAnti) { - return plan_err!( - "null_aware can only be true for LeftAnti joins, got {join_type}" - ); - } - if on.len() != 1 { - return plan_err!( - "null_aware anti join only supports single column join key, got {} columns", - on.len() - ); - } - } - - let (join_schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); - - let random_state = HASH_JOIN_SEED; - - let join_schema = Arc::new(join_schema); - - // check if the projection is valid - can_project(&join_schema, projection.as_ref())?; - - let cache = Self::compute_properties( - &left, - &right, - &join_schema, - *join_type, - &on, - partition_mode, - projection.as_ref(), - )?; - - // Initialize both dynamic filter and bounds accumulator to None - // They will be set later if dynamic filtering is enabled - - Ok(HashJoinExec { - left, - right, - on, - filter, - join_type: *join_type, - join_schema, - left_fut: Default::default(), - random_state, - mode: partition_mode, - metrics: ExecutionPlanMetricsSet::new(), - projection, - column_indices, - null_equality, - null_aware, - cache, - dynamic_filter: None, - }) + HashJoinExecBuilder::new(left, right, on, *join_type) + .with_filter(filter) + .with_projection(projection) + .with_partition_mode(partition_mode) + .with_null_equality(null_equality) + .with_null_aware(null_aware) + .build() } fn create_dynamic_filter(on: &JoinOn) -> Arc { @@ -687,26 +797,14 @@ impl HashJoinExec { /// Return new instance of [HashJoinExec] with the given projection. pub fn with_projection(&self, projection: Option>) -> Result { + let projection = projection.map(Into::into); // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - Self::try_new( - Arc::clone(&self.left), - Arc::clone(&self.right), - self.on.clone(), - self.filter.clone(), - &self.join_type, - projection, - self.mode, - self.null_equality, - self.null_aware, - ) + can_project(&self.schema(), projection.as_deref())?; + let projection = + combine_projections(projection.as_ref(), self.projection.as_ref())?; + HashJoinExecBuilder::from(self) + .with_projection_ref(projection) + .build() } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -717,7 +815,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -769,7 +867,7 @@ impl HashJoinExec { if let Some(projection) = projection { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; + let out_schema = project_schema(schema, Some(&projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -824,7 +922,7 @@ impl HashJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), partition_mode, @@ -1020,7 +1118,7 @@ impl ExecutionPlan for HashJoinExec { self.join_type, &self.on, self.mode, - self.projection.as_ref(), + self.projection.as_deref(), )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), @@ -1181,7 +1279,7 @@ impl ExecutionPlan for HashJoinExec { let right_stream = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 8592e1d968535..b915802ea4015 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -17,7 +17,7 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator -pub use exec::HashJoinExec; +pub use exec::{HashJoinExec, HashJoinExecBuilder}; pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; mod exec; diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 848d0472fe885..2cdfa1e6ac020 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -20,8 +20,10 @@ use arrow::array::BooleanBufferBuilder; pub use cross_join::CrossJoinExec; use datafusion_physical_expr::PhysicalExprRef; -pub use hash_join::{HashExpr, HashJoinExec, HashTableLookupExpr, SeededRandomState}; -pub use nested_loop_join::NestedLoopJoinExec; +pub use hash_join::{ + HashExpr, HashJoinExec, HashJoinExecBuilder, HashTableLookupExpr, SeededRandomState, +}; +pub use nested_loop_join::{NestedLoopJoinExec, NestedLoopJoinExecBuilder}; use parking_lot::Mutex; // Note: SortMergeJoin is not used in plans yet pub use piecewise_merge_join::PiecewiseMergeJoinExec; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b57f9132253bf..e6bc26c34cb44 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -71,6 +71,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; +use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use futures::{Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -192,7 +193,7 @@ pub struct NestedLoopJoinExec { /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join - projection: Option>, + projection: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -200,34 +201,76 @@ pub struct NestedLoopJoinExec { cache: PlanProperties, } -impl NestedLoopJoinExec { - /// Try to create a new [`NestedLoopJoinExec`] - pub fn try_new( +/// Helps to build [`NestedLoopJoinExec`]. +pub struct NestedLoopJoinExecBuilder { + left: Arc, + right: Arc, + join_type: JoinType, + filter: Option, + projection: Option, +} + +impl NestedLoopJoinExecBuilder { + /// Make a new [`NestedLoopJoinExecBuilder`]. + pub fn new( left: Arc, right: Arc, - filter: Option, - join_type: &JoinType, - projection: Option>, - ) -> Result { + join_type: JoinType, + ) -> Self { + Self { + left, + right, + join_type, + filter: None, + projection: None, + } + } + + /// Set projection from the vector. + pub fn with_projection(self, projection: Option>) -> Self { + self.with_projection_ref(projection.map(Into::into)) + } + + /// Set projection from the shared reference. + pub fn with_projection_ref(mut self, projection: Option) -> Self { + self.projection = projection; + self + } + + /// Set optional filter. + pub fn with_filter(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + /// Build resulting execution plan. + pub fn build(self) -> Result { + let Self { + left, + right, + join_type, + filter, + projection, + } = self; + let left_schema = left.schema(); let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; let (join_schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); + build_join_schema(&left_schema, &right_schema, &join_type); let join_schema = Arc::new(join_schema); - let cache = Self::compute_properties( + let cache = NestedLoopJoinExec::compute_properties( &left, &right, &join_schema, - *join_type, - projection.as_ref(), + join_type, + projection.as_deref(), )?; - Ok(NestedLoopJoinExec { left, right, filter, - join_type: *join_type, + join_type, join_schema, build_side_data: Default::default(), column_indices, @@ -236,6 +279,34 @@ impl NestedLoopJoinExec { cache, }) } +} + +impl From<&NestedLoopJoinExec> for NestedLoopJoinExecBuilder { + fn from(exec: &NestedLoopJoinExec) -> Self { + Self { + left: Arc::clone(exec.left()), + right: Arc::clone(exec.right()), + join_type: exec.join_type, + filter: exec.filter.clone(), + projection: exec.projection.clone(), + } + } +} + +impl NestedLoopJoinExec { + /// Try to create a new [`NestedLoopJoinExec`] + pub fn try_new( + left: Arc, + right: Arc, + filter: Option, + join_type: &JoinType, + projection: Option>, + ) -> Result { + NestedLoopJoinExecBuilder::new(left, right, *join_type) + .with_projection(projection) + .with_filter(filter) + .build() + } /// left side pub fn left(&self) -> &Arc { @@ -257,8 +328,8 @@ impl NestedLoopJoinExec { &self.join_type } - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &Option { + &self.projection } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -267,7 +338,7 @@ impl NestedLoopJoinExec { right: &Arc, schema: &SchemaRef, join_type: JoinType, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -310,7 +381,7 @@ impl NestedLoopJoinExec { if let Some(projection) = projection { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; + let out_schema = project_schema(schema, Some(&projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -334,22 +405,14 @@ impl NestedLoopJoinExec { } pub fn with_projection(&self, projection: Option>) -> Result { + let projection = projection.map(Into::into); // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - Self::try_new( - Arc::clone(&self.left), - Arc::clone(&self.right), - self.filter.clone(), - &self.join_type, - projection, - ) + can_project(&self.schema(), projection.as_deref())?; + let projection = + combine_projections(projection.as_ref(), self.projection.as_ref())?; + NestedLoopJoinExecBuilder::from(self) + .with_projection_ref(projection) + .build() } /// Returns a new `ExecutionPlan` that runs NestedLoopsJoins with the left @@ -371,7 +434,7 @@ impl NestedLoopJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), )?; @@ -476,13 +539,16 @@ impl ExecutionPlan for NestedLoopJoinExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(NestedLoopJoinExec::try_new( - Arc::clone(&children[0]), - Arc::clone(&children[1]), - self.filter.clone(), - &self.join_type, - self.projection.clone(), - )?)) + Ok(Arc::new( + NestedLoopJoinExecBuilder::new( + Arc::clone(&children[0]), + Arc::clone(&children[1]), + self.join_type, + ) + .with_filter(self.filter.clone()) + .with_projection_ref(self.projection.clone()) + .build()?, + )) } fn execute( @@ -521,7 +587,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28d..e709703e07d45 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(|p| p.to_vec()), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f00360292239f..76711a8f835f4 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -138,13 +138,19 @@ impl ProjectionExec { E: Into, { let input_schema = input.schema(); - // convert argument to Vec - let expr_vec = expr.into_iter().map(Into::into).collect::>(); - let projection = ProjectionExprs::new(expr_vec); + let expr_arc = expr.into_iter().map(Into::into).collect::>(); + let projection = ProjectionExprs::from_expressions(expr_arc); let projector = projection.make_projector(&input_schema)?; + Self::try_from_projector(projector, input) + } + fn try_from_projector( + projector: Projector, + input: Arc, + ) -> Result { // Construct a map from the input expressions to the output expression of the Projection - let projection_mapping = projection.projection_mapping(&input_schema)?; + let projection_mapping = + projector.projection().projection_mapping(&input.schema())?; let cache = Self::compute_properties( &input, &projection_mapping, @@ -305,8 +311,8 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { - ProjectionExec::try_new( - self.projector.projection().clone(), + ProjectionExec::try_from_projector( + self.projector.clone(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index cb731ff04cb0c..60d8b5705bf35 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3173,7 +3173,7 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().map_or_else(Vec::new, |v| { + projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { v.iter().map(|x| *x as u32).collect::>() }), },