diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index 563d62e91b..c2b144b7dd 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -22,6 +22,8 @@ pub mod bitwise; pub mod comparison; pub mod logical; pub mod nullcheck; +pub mod partition; +pub mod random; pub mod strings; pub mod subquery; pub mod temporal; diff --git a/native/core/src/execution/expressions/partition.rs b/native/core/src/execution/expressions/partition.rs new file mode 100644 index 0000000000..4b0287f8c6 --- /dev/null +++ b/native/core/src/execution/expressions/partition.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::operators::ExecutionError; +use crate::execution::planner::expression_registry::ExpressionBuilder; +use crate::execution::planner::PhysicalPlanner; +use arrow::datatypes::SchemaRef; +use datafusion::common::ScalarValue; +use datafusion::physical_expr::expressions::Literal; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::Expr; +use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId; +use std::sync::Arc; + +pub struct SparkPartitionIdBuilder; + +impl ExpressionBuilder for SparkPartitionIdBuilder { + fn build( + &self, + _spark_expr: &Expr, + _input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + Ok(Arc::new(Literal::new(ScalarValue::Int32(Some( + planner.partition(), + ))))) + } +} + +pub struct MonotonicallyIncreasingIdBuilder; + +impl ExpressionBuilder for MonotonicallyIncreasingIdBuilder { + fn build( + &self, + _spark_expr: &Expr, + _input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + Ok(Arc::new(MonotonicallyIncreasingId::from_partition_id( + planner.partition(), + ))) + } +} diff --git a/native/core/src/execution/expressions/random.rs b/native/core/src/execution/expressions/random.rs new file mode 100644 index 0000000000..5ea6092cb0 --- /dev/null +++ b/native/core/src/execution/expressions/random.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::operators::ExecutionError; +use crate::execution::planner::expression_registry::ExpressionBuilder; +use crate::execution::planner::PhysicalPlanner; +use crate::extract_expr; +use arrow::datatypes::SchemaRef; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::Expr; +use datafusion_comet_spark_expr::{RandExpr, RandnExpr}; +use std::sync::Arc; + +pub struct RandBuilder; + +impl ExpressionBuilder for RandBuilder { + fn build( + &self, + spark_expr: &Expr, + _input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Rand); + let seed = expr.seed.wrapping_add(planner.partition().into()); + Ok(Arc::new(RandExpr::new(seed))) + } +} + +pub struct RandnBuilder; + +impl ExpressionBuilder for RandnBuilder { + fn build( + &self, + spark_expr: &Expr, + _input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Randn); + let seed = expr.seed.wrapping_add(planner.partition().into()); + Ok(Arc::new(RandnExpr::new(seed))) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 15bbabe883..bd37755922 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -123,12 +123,11 @@ use datafusion_comet_proto::{ }, spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; -use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId; use datafusion_comet_spark_expr::{ ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, DecimalRescaleCheckOverflow, GetArrayStructFields, GetStructField, IfExpr, ListExtract, - NormalizeNaNAndZero, RandExpr, RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, - UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp, + NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance, + WideDecimalBinaryExpr, WideDecimalOp, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -197,6 +196,11 @@ impl PhysicalPlanner { &self.session_ctx } + /// Return partition id of this planner. + pub fn partition(&self) -> i32 { + self.partition + } + /// get DataFusion PartitionedFiles from a Spark FilePartition fn get_partitioned_files( &self, @@ -655,20 +659,6 @@ impl PhysicalPlanner { expr.legacy_negative_index, ))) } - ExprStruct::Rand(expr) => { - let seed = expr.seed.wrapping_add(self.partition.into()); - Ok(Arc::new(RandExpr::new(seed))) - } - ExprStruct::Randn(expr) => { - let seed = expr.seed.wrapping_add(self.partition.into()); - Ok(Arc::new(RandnExpr::new(seed))) - } - ExprStruct::SparkPartitionId(_) => Ok(Arc::new(DataFusionLiteral::new( - ScalarValue::Int32(Some(self.partition)), - ))), - ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new( - MonotonicallyIncreasingId::from_partition_id(self.partition), - )), ExprStruct::ToCsv(expr) => { let csv_struct_expr = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 34aa3de179..bf3904d9c1 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -184,6 +184,12 @@ impl ExpressionRegistry { // Register temporal expressions self.register_temporal_expressions(); + + // Register random expressions + self.register_random_expressions(); + + // Register partition expressions + self.register_partition_expressions(); } /// Register arithmetic expression builders @@ -386,4 +392,28 @@ impl ExpressionRegistry { )), } } + + /// Register random expression builders + fn register_random_expressions(&mut self) { + use crate::execution::expressions::random::*; + + self.builders + .insert(ExpressionType::Rand, Box::new(RandBuilder)); + self.builders + .insert(ExpressionType::Randn, Box::new(RandnBuilder)); + } + + /// Register partition expression builders + fn register_partition_expressions(&mut self) { + use crate::execution::expressions::partition::*; + + self.builders.insert( + ExpressionType::SparkPartitionId, + Box::new(SparkPartitionIdBuilder), + ); + self.builders.insert( + ExpressionType::MonotonicallyIncreasingId, + Box::new(MonotonicallyIncreasingIdBuilder), + ); + } }