diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index d4955313c79c3..7db50b19bf566 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -18,21 +18,31 @@ use arrow::array::{ArrayRef, RecordBatch}; use arrow_schema::DataType; use arrow_schema::TimeUnit::Nanosecond; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, + measurement::WallTime, +}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; -use datafusion_common::ScalarValue; +use datafusion_common::{Column, ScalarValue}; use datafusion_expr::Expr::Literal; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::utils::split_conjunction_owned; use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; +use std::env; use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; +const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; +const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; +const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; + // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns // with a variety of operations in it. @@ -218,7 +228,9 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { register_string_table(ctx, 100, 1000); let query = build_case_heavy_left_join_query(30, 1); - rt.block_on(async { ctx.sql(&query).await.unwrap() }) + let df = rt.block_on(async { ctx.sql(&query).await.unwrap() }); + assert_case_heavy_left_join_inference_candidates(&df, 30); + df } fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String { @@ -237,12 +249,17 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query.push_str(" AND "); } - let mut expr = format!("length(l.c{})", i % 20); + let left_payload_col = (i % 19) + 1; + let right_payload_col = ((i + 7) % 19) + 1; + let mut expr = format!( + "CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END" + ); for depth in 0..case_depth { - let left_col = (i + depth + 1) % 20; - let right_col = (i + depth + 2) % 20; + let left_col = ((i + depth + 3) % 19) + 1; + let right_col = ((i + depth + 11) % 19) + 1; + let join_key_ref = if (i + depth) % 2 == 0 { "l.c0" } else { "r.c0" }; expr = format!( - "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END" + "CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END" ); } @@ -252,26 +269,6 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query } -fn build_case_heavy_left_join_df_with_push_down_filter( - rt: &Runtime, - predicate_count: usize, - case_depth: usize, - push_down_filter_enabled: bool, -) -> DataFrame { - let ctx = SessionContext::new(); - register_string_table(&ctx, 100, 1000); - if !push_down_filter_enabled { - let removed = ctx.remove_optimizer_rule("push_down_filter"); - assert!( - removed, - "push_down_filter rule should be present in the default optimizer" - ); - } - - let query = build_case_heavy_left_join_query(predicate_count, case_depth); - rt.block_on(async { ctx.sql(&query).await.unwrap() }) -} - fn build_non_case_left_join_query( predicate_count: usize, nesting_depth: usize, @@ -304,10 +301,11 @@ fn build_non_case_left_join_query( query } -fn build_non_case_left_join_df_with_push_down_filter( +fn build_left_join_df_with_push_down_filter( rt: &Runtime, + query_builder: impl Fn(usize, usize) -> String, predicate_count: usize, - nesting_depth: usize, + depth: usize, push_down_filter_enabled: bool, ) -> DataFrame { let ctx = SessionContext::new(); @@ -320,10 +318,143 @@ fn build_non_case_left_join_df_with_push_down_filter( ); } - let query = build_non_case_left_join_query(predicate_count, nesting_depth); + let query = query_builder(predicate_count, depth); rt.block_on(async { ctx.sql(&query).await.unwrap() }) } +fn build_case_heavy_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + case_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + let df = build_left_join_df_with_push_down_filter( + rt, + build_case_heavy_left_join_query, + predicate_count, + case_depth, + push_down_filter_enabled, + ); + assert_case_heavy_left_join_inference_candidates(&df, predicate_count); + df +} + +fn build_non_case_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + nesting_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + build_left_join_df_with_push_down_filter( + rt, + build_non_case_left_join_query, + predicate_count, + nesting_depth, + push_down_filter_enabled, + ) +} + +fn find_filter_predicates(plan: &LogicalPlan) -> Vec { + match plan { + LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()), + LogicalPlan::Projection(projection) => { + find_filter_predicates(projection.input.as_ref()) + } + other => { + panic!("expected benchmark query plan to contain a Filter, found {other:?}") + } + } +} + +fn assert_case_heavy_left_join_inference_candidates( + df: &DataFrame, + expected_predicate_count: usize, +) { + let predicates = find_filter_predicates(df.logical_plan()); + assert_eq!(predicates.len(), expected_predicate_count); + + let left_join_key = Column::from_qualified_name("l.c0"); + let right_join_key = Column::from_qualified_name("r.c0"); + + for predicate in predicates { + let column_refs = predicate.column_refs(); + assert!( + column_refs.contains(&&left_join_key) + || column_refs.contains(&&right_join_key), + "benchmark predicate should reference a join key: {predicate}" + ); + assert!( + column_refs + .iter() + .any(|col| **col != left_join_key && **col != right_join_key), + "benchmark predicate should reference a non-join column: {predicate}" + ); + } +} + +fn include_full_push_down_filter_sweep() -> bool { + env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") + .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { + if include_full_push_down_filter_sweep() { + FULL_DEPTH_SWEEP + .into_iter() + .flat_map(|depth| { + FULL_PREDICATE_SWEEP + .into_iter() + .map(move |predicate_count| (predicate_count, depth)) + }) + .collect() + } else { + DEFAULT_SWEEP_POINTS.to_vec() + } +} + +fn bench_push_down_filter_ab( + group: &mut BenchmarkGroup<'_, WallTime>, + rt: &Runtime, + sweep_points: &[(usize, usize)], + build_df: BuildFn, +) where + BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame, +{ + for &(predicate_count, depth) in sweep_points { + let with_push_down_filter = build_df(rt, predicate_count, depth, true); + let without_push_down_filter = build_df(rt, predicate_count, depth, false); + + let input_label = format!("predicates={predicate_count},nesting_depth={depth}"); + + group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + } +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -349,116 +480,40 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - let predicate_sweep = [10, 20, 30, 40, 60]; - let case_depth_sweep = [1, 2, 3]; + let sweep_points = push_down_filter_sweep_points(); let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - for case_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - true, - ); - let without_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},case_depth={case_depth}"); - // A/B interpretation: - // - with_push_down_filter: default optimizer path (rule enabled) - // - without_push_down_filter: control path with the rule removed - // Compare both IDs at the same sweep point to isolate rule impact. - hotspot_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - hotspot_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - } - } + bench_push_down_filter_ab( + &mut hotspot_group, + &rt, + &sweep_points, + |rt, predicate_count, depth, enable| { + build_case_heavy_left_join_df_with_push_down_filter( + rt, + predicate_count, + depth, + enable, + ) + }, + ); hotspot_group.finish(); let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - for nesting_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( - &rt, + bench_push_down_filter_ab( + &mut control_group, + &rt, + &sweep_points, + |rt, predicate_count, depth, enable| { + build_non_case_left_join_df_with_push_down_filter( + rt, predicate_count, - nesting_depth, - true, - ); - let without_push_down_filter = - build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},nesting_depth={nesting_depth}"); - control_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - control_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - } - } + depth, + enable, + ) + }, + ); control_group.finish(); }