From aa6db023b29952fe014301f625b25a593fa3d3ba Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 18 Mar 2026 21:36:55 +0800 Subject: [PATCH 01/10] Refactor push down filter benchmarks to use dynamic sweep points --- .../core/benches/sql_planner_extended.rs | 212 +++++++++--------- 1 file changed, 112 insertions(+), 100 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index d4955313c79c3..213577d6c24a2 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -27,12 +27,17 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; +use std::env; use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; +const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; +const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; +const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; + // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns // with a variety of operations in it. @@ -324,6 +329,27 @@ fn build_non_case_left_join_df_with_push_down_filter( rt.block_on(async { ctx.sql(&query).await.unwrap() }) } +fn include_full_push_down_filter_sweep() -> bool { + env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") + .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { + if include_full_push_down_filter_sweep() { + FULL_DEPTH_SWEEP + .into_iter() + .flat_map(|depth| { + FULL_PREDICATE_SWEEP + .into_iter() + .map(move |predicate_count| (predicate_count, depth)) + }) + .collect() + } else { + DEFAULT_SWEEP_POINTS.to_vec() + } +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -349,115 +375,101 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - let predicate_sweep = [10, 20, 30, 40, 60]; - let case_depth_sweep = [1, 2, 3]; + let sweep_points = push_down_filter_sweep_points(); let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - for case_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - true, - ); - let without_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},case_depth={case_depth}"); - // A/B interpretation: - // - with_push_down_filter: default optimizer path (rule enabled) - // - without_push_down_filter: control path with the rule removed - // Compare both IDs at the same sweep point to isolate rule impact. - hotspot_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - hotspot_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, + hotspot_group.sample_size(10); + for &(predicate_count, case_depth) in &sweep_points { + let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + true, + ); + let without_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + false, ); - } + + let input_label = format!("predicates={predicate_count},case_depth={case_depth}"); + // A/B interpretation: + // - with_push_down_filter: default optimizer path (rule enabled) + // - without_push_down_filter: control path with the rule removed + // Compare both IDs at the same sweep point to isolate rule impact. + hotspot_group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + hotspot_group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); } hotspot_group.finish(); let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - for nesting_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - true, - ); - let without_push_down_filter = - build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},nesting_depth={nesting_depth}"); - control_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - control_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - } + control_group.sample_size(10); + for &(predicate_count, nesting_depth) in &sweep_points { + let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( + &rt, + predicate_count, + nesting_depth, + true, + ); + let without_push_down_filter = build_non_case_left_join_df_with_push_down_filter( + &rt, + predicate_count, + nesting_depth, + false, + ); + + let input_label = + format!("predicates={predicate_count},nesting_depth={nesting_depth}"); + control_group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + control_group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); } control_group.finish(); } From 52bf5c98bfafcf42b5e910ef6427c4f3e4fd8302 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 18 Mar 2026 21:46:58 +0800 Subject: [PATCH 02/10] Add dynamic sample size configuration for push down filter benchmarks --- datafusion/core/benches/sql_planner_extended.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 213577d6c24a2..97431929ff53f 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -37,6 +37,7 @@ use tokio::runtime::Runtime; const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; +const DEFAULT_SAMPLE_SIZE: usize = 10; // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns @@ -350,6 +351,14 @@ fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { } } +fn push_down_filter_sample_size() -> usize { + env::var("DATAFUSION_PUSH_DOWN_FILTER_SAMPLE_SIZE") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|sample_size| *sample_size >= 10) + .unwrap_or(DEFAULT_SAMPLE_SIZE) +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -376,10 +385,11 @@ fn criterion_benchmark(c: &mut Criterion) { }); let sweep_points = push_down_filter_sweep_points(); + let sample_size = push_down_filter_sample_size(); let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - hotspot_group.sample_size(10); + hotspot_group.sample_size(sample_size); for &(predicate_count, case_depth) in &sweep_points { let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( &rt, @@ -429,7 +439,7 @@ fn criterion_benchmark(c: &mut Criterion) { let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - control_group.sample_size(10); + control_group.sample_size(sample_size); for &(predicate_count, nesting_depth) in &sweep_points { let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( &rt, From f996e370c74634621d76baf5b469f608e53cebd4 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 18 Mar 2026 21:59:59 +0800 Subject: [PATCH 03/10] Remove unused sample size function and constant from push down filter benchmarks --- datafusion/core/benches/sql_planner_extended.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 97431929ff53f..ccd7d3f3f031b 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -37,7 +37,6 @@ use tokio::runtime::Runtime; const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; -const DEFAULT_SAMPLE_SIZE: usize = 10; // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns @@ -351,14 +350,6 @@ fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { } } -fn push_down_filter_sample_size() -> usize { - env::var("DATAFUSION_PUSH_DOWN_FILTER_SAMPLE_SIZE") - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|sample_size| *sample_size >= 10) - .unwrap_or(DEFAULT_SAMPLE_SIZE) -} - fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -385,11 +376,8 @@ fn criterion_benchmark(c: &mut Criterion) { }); let sweep_points = push_down_filter_sweep_points(); - let sample_size = push_down_filter_sample_size(); - let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - hotspot_group.sample_size(sample_size); for &(predicate_count, case_depth) in &sweep_points { let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( &rt, @@ -439,7 +427,6 @@ fn criterion_benchmark(c: &mut Criterion) { let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - control_group.sample_size(sample_size); for &(predicate_count, nesting_depth) in &sweep_points { let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( &rt, From f91fa15306c00343c2cb48ed806e8e17bd486c5d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 20 Mar 2026 11:49:15 +0800 Subject: [PATCH 04/10] feat(benchmarks): refactor push down filter benchmarks for improved organization - Refactored functions to build left join DataFrames with push down filters. - Created `bench_push_down_filter_ab` for streamlined benchmarking of push down filter impact. - Updated benchmark groups to use the new building functions, enhancing readability and maintainability. --- .../core/benches/sql_planner_extended.rs | 193 ++++++++++-------- 1 file changed, 105 insertions(+), 88 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index ccd7d3f3f031b..012587b064700 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -18,7 +18,10 @@ use arrow::array::{ArrayRef, RecordBatch}; use arrow_schema::DataType; use arrow_schema::TimeUnit::Nanosecond; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, + measurement::WallTime, +}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; @@ -309,10 +312,11 @@ fn build_non_case_left_join_query( query } -fn build_non_case_left_join_df_with_push_down_filter( +fn build_left_join_df_with_push_down_filter( rt: &Runtime, + query_builder: impl Fn(usize, usize) -> String, predicate_count: usize, - nesting_depth: usize, + depth: usize, push_down_filter_enabled: bool, ) -> DataFrame { let ctx = SessionContext::new(); @@ -325,10 +329,40 @@ fn build_non_case_left_join_df_with_push_down_filter( ); } - let query = build_non_case_left_join_query(predicate_count, nesting_depth); + let query = query_builder(predicate_count, depth); rt.block_on(async { ctx.sql(&query).await.unwrap() }) } +fn build_case_heavy_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + case_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + build_left_join_df_with_push_down_filter( + rt, + build_case_heavy_left_join_query, + predicate_count, + case_depth, + push_down_filter_enabled, + ) +} + +fn build_non_case_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + nesting_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + build_left_join_df_with_push_down_filter( + rt, + build_non_case_left_join_query, + predicate_count, + nesting_depth, + push_down_filter_enabled, + ) +} + fn include_full_push_down_filter_sweep() -> bool { env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) @@ -350,6 +384,48 @@ fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { } } +fn bench_push_down_filter_ab( + group: &mut BenchmarkGroup<'_, WallTime>, + rt: &Runtime, + sweep_points: &[(usize, usize)], + build_df: BuildFn, +) where + BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame, +{ + for &(predicate_count, depth) in sweep_points { + let with_push_down_filter = build_df(rt, predicate_count, depth, true); + let without_push_down_filter = build_df(rt, predicate_count, depth, false); + + let input_label = format!("predicates={predicate_count},nesting_depth={depth}"); + + group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + } +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -376,98 +452,39 @@ fn criterion_benchmark(c: &mut Criterion) { }); let sweep_points = push_down_filter_sweep_points(); + let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - for &(predicate_count, case_depth) in &sweep_points { - let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - true, - ); - let without_push_down_filter = + bench_push_down_filter_ab( + &mut hotspot_group, + &rt, + &sweep_points, + |rt, predicate_count, depth, enable| { build_case_heavy_left_join_df_with_push_down_filter( - &rt, + rt, predicate_count, - case_depth, - false, - ); - - let input_label = format!("predicates={predicate_count},case_depth={case_depth}"); - // A/B interpretation: - // - with_push_down_filter: default optimizer path (rule enabled) - // - without_push_down_filter: control path with the rule removed - // Compare both IDs at the same sweep point to isolate rule impact. - hotspot_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }, - ); - hotspot_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }, - ); - } + depth, + enable, + ) + }, + ); hotspot_group.finish(); let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - for &(predicate_count, nesting_depth) in &sweep_points { - let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - true, - ); - let without_push_down_filter = build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},nesting_depth={nesting_depth}"); - control_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }, - ); - control_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }, - ); - } + bench_push_down_filter_ab( + &mut control_group, + &rt, + &sweep_points, + |rt, predicate_count, depth, enable| { + build_non_case_left_join_df_with_push_down_filter( + rt, + predicate_count, + depth, + enable, + ) + }, + ); control_group.finish(); } From e170b320cc8155b660e1bbda398f113474141420 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 20 Mar 2026 12:59:49 +0800 Subject: [PATCH 05/10] fix(benchmarks): remove duplicate function `build_case_heavy_left_join_df_with_push_down_filter` This commit removes the `build_case_heavy_left_join_df_with_push_down_filter` duplicate function from the `sql_planner_extended.rs` benchmark file --- .../core/benches/sql_planner_extended.rs | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 012587b064700..73141bd8668e4 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -260,26 +260,6 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query } -fn build_case_heavy_left_join_df_with_push_down_filter( - rt: &Runtime, - predicate_count: usize, - case_depth: usize, - push_down_filter_enabled: bool, -) -> DataFrame { - let ctx = SessionContext::new(); - register_string_table(&ctx, 100, 1000); - if !push_down_filter_enabled { - let removed = ctx.remove_optimizer_rule("push_down_filter"); - assert!( - removed, - "push_down_filter rule should be present in the default optimizer" - ); - } - - let query = build_case_heavy_left_join_query(predicate_count, case_depth); - rt.block_on(async { ctx.sql(&query).await.unwrap() }) -} - fn build_non_case_left_join_query( predicate_count: usize, nesting_depth: usize, From 6b9d8d292a2634190b8aae37661f1687a285107d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 30 Mar 2026 12:10:26 +0800 Subject: [PATCH 06/10] feat(benches): enhance case-heavy left join benchmarks with inference checks - Added assertions to verify inference candidates in case-heavy left join data frames. - Introduced helper functions `find_filter_predicates` and `assert_case_heavy_left_join_inference_candidates` for better structure and readability. - Updated join logic in `build_case_heavy_left_join_query` for more complex case handling. This update improves the robustness of benchmarks by ensuring correctness in filter predicate references related to join keys. --- .../core/benches/sql_planner_extended.rs | 65 ++++++++++++++++--- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 73141bd8668e4..7db50b19bf566 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -24,8 +24,10 @@ use criterion::{ }; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; -use datafusion_common::ScalarValue; +use datafusion_common::{Column, ScalarValue}; use datafusion_expr::Expr::Literal; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::utils::split_conjunction_owned; use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, @@ -226,7 +228,9 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { register_string_table(ctx, 100, 1000); let query = build_case_heavy_left_join_query(30, 1); - rt.block_on(async { ctx.sql(&query).await.unwrap() }) + let df = rt.block_on(async { ctx.sql(&query).await.unwrap() }); + assert_case_heavy_left_join_inference_candidates(&df, 30); + df } fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String { @@ -245,12 +249,17 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query.push_str(" AND "); } - let mut expr = format!("length(l.c{})", i % 20); + let left_payload_col = (i % 19) + 1; + let right_payload_col = ((i + 7) % 19) + 1; + let mut expr = format!( + "CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END" + ); for depth in 0..case_depth { - let left_col = (i + depth + 1) % 20; - let right_col = (i + depth + 2) % 20; + let left_col = ((i + depth + 3) % 19) + 1; + let right_col = ((i + depth + 11) % 19) + 1; + let join_key_ref = if (i + depth) % 2 == 0 { "l.c0" } else { "r.c0" }; expr = format!( - "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END" + "CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END" ); } @@ -319,13 +328,15 @@ fn build_case_heavy_left_join_df_with_push_down_filter( case_depth: usize, push_down_filter_enabled: bool, ) -> DataFrame { - build_left_join_df_with_push_down_filter( + let df = build_left_join_df_with_push_down_filter( rt, build_case_heavy_left_join_query, predicate_count, case_depth, push_down_filter_enabled, - ) + ); + assert_case_heavy_left_join_inference_candidates(&df, predicate_count); + df } fn build_non_case_left_join_df_with_push_down_filter( @@ -343,6 +354,44 @@ fn build_non_case_left_join_df_with_push_down_filter( ) } +fn find_filter_predicates(plan: &LogicalPlan) -> Vec { + match plan { + LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()), + LogicalPlan::Projection(projection) => { + find_filter_predicates(projection.input.as_ref()) + } + other => { + panic!("expected benchmark query plan to contain a Filter, found {other:?}") + } + } +} + +fn assert_case_heavy_left_join_inference_candidates( + df: &DataFrame, + expected_predicate_count: usize, +) { + let predicates = find_filter_predicates(df.logical_plan()); + assert_eq!(predicates.len(), expected_predicate_count); + + let left_join_key = Column::from_qualified_name("l.c0"); + let right_join_key = Column::from_qualified_name("r.c0"); + + for predicate in predicates { + let column_refs = predicate.column_refs(); + assert!( + column_refs.contains(&&left_join_key) + || column_refs.contains(&&right_join_key), + "benchmark predicate should reference a join key: {predicate}" + ); + assert!( + column_refs + .iter() + .any(|col| **col != left_join_key && **col != right_join_key), + "benchmark predicate should reference a non-join column: {predicate}" + ); + } +} + fn include_full_push_down_filter_sweep() -> bool { env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) From 35feedbeb81303a6ce4d07c7fc6a4191281f46bb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 31 Mar 2026 18:27:47 +0800 Subject: [PATCH 07/10] refactor: streamline benchmarks for push down filter with improved query handling - Removed unnecessary functions and constants related to push down filter sweep configurations. - Simplified the logic for constructing test DataFrames, focusing on essential parameters for benchmarks. - Enhanced clarity of the benchmarks by differentiating cases for `with_push_down_filter` and `without_push_down_filter`. - Updated the implementation to improve readability and maintainability. --- .../core/benches/sql_planner_extended.rs | 319 ++++++++---------- 1 file changed, 132 insertions(+), 187 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 7db50b19bf566..d4955313c79c3 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -18,31 +18,21 @@ use arrow::array::{ArrayRef, RecordBatch}; use arrow_schema::DataType; use arrow_schema::TimeUnit::Nanosecond; -use criterion::{ - BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, - measurement::WallTime, -}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; -use datafusion_common::{Column, ScalarValue}; +use datafusion_common::ScalarValue; use datafusion_expr::Expr::Literal; -use datafusion_expr::logical_plan::LogicalPlan; -use datafusion_expr::utils::split_conjunction_owned; use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; -use std::env; use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; -const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; -const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; -const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; - // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns // with a variety of operations in it. @@ -228,9 +218,7 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { register_string_table(ctx, 100, 1000); let query = build_case_heavy_left_join_query(30, 1); - let df = rt.block_on(async { ctx.sql(&query).await.unwrap() }); - assert_case_heavy_left_join_inference_candidates(&df, 30); - df + rt.block_on(async { ctx.sql(&query).await.unwrap() }) } fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String { @@ -249,17 +237,12 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query.push_str(" AND "); } - let left_payload_col = (i % 19) + 1; - let right_payload_col = ((i + 7) % 19) + 1; - let mut expr = format!( - "CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END" - ); + let mut expr = format!("length(l.c{})", i % 20); for depth in 0..case_depth { - let left_col = ((i + depth + 3) % 19) + 1; - let right_col = ((i + depth + 11) % 19) + 1; - let join_key_ref = if (i + depth) % 2 == 0 { "l.c0" } else { "r.c0" }; + let left_col = (i + depth + 1) % 20; + let right_col = (i + depth + 2) % 20; expr = format!( - "CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END" + "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END" ); } @@ -269,6 +252,26 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query } +fn build_case_heavy_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + case_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + let ctx = SessionContext::new(); + register_string_table(&ctx, 100, 1000); + if !push_down_filter_enabled { + let removed = ctx.remove_optimizer_rule("push_down_filter"); + assert!( + removed, + "push_down_filter rule should be present in the default optimizer" + ); + } + + let query = build_case_heavy_left_join_query(predicate_count, case_depth); + rt.block_on(async { ctx.sql(&query).await.unwrap() }) +} + fn build_non_case_left_join_query( predicate_count: usize, nesting_depth: usize, @@ -301,11 +304,10 @@ fn build_non_case_left_join_query( query } -fn build_left_join_df_with_push_down_filter( +fn build_non_case_left_join_df_with_push_down_filter( rt: &Runtime, - query_builder: impl Fn(usize, usize) -> String, predicate_count: usize, - depth: usize, + nesting_depth: usize, push_down_filter_enabled: bool, ) -> DataFrame { let ctx = SessionContext::new(); @@ -318,143 +320,10 @@ fn build_left_join_df_with_push_down_filter( ); } - let query = query_builder(predicate_count, depth); + let query = build_non_case_left_join_query(predicate_count, nesting_depth); rt.block_on(async { ctx.sql(&query).await.unwrap() }) } -fn build_case_heavy_left_join_df_with_push_down_filter( - rt: &Runtime, - predicate_count: usize, - case_depth: usize, - push_down_filter_enabled: bool, -) -> DataFrame { - let df = build_left_join_df_with_push_down_filter( - rt, - build_case_heavy_left_join_query, - predicate_count, - case_depth, - push_down_filter_enabled, - ); - assert_case_heavy_left_join_inference_candidates(&df, predicate_count); - df -} - -fn build_non_case_left_join_df_with_push_down_filter( - rt: &Runtime, - predicate_count: usize, - nesting_depth: usize, - push_down_filter_enabled: bool, -) -> DataFrame { - build_left_join_df_with_push_down_filter( - rt, - build_non_case_left_join_query, - predicate_count, - nesting_depth, - push_down_filter_enabled, - ) -} - -fn find_filter_predicates(plan: &LogicalPlan) -> Vec { - match plan { - LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()), - LogicalPlan::Projection(projection) => { - find_filter_predicates(projection.input.as_ref()) - } - other => { - panic!("expected benchmark query plan to contain a Filter, found {other:?}") - } - } -} - -fn assert_case_heavy_left_join_inference_candidates( - df: &DataFrame, - expected_predicate_count: usize, -) { - let predicates = find_filter_predicates(df.logical_plan()); - assert_eq!(predicates.len(), expected_predicate_count); - - let left_join_key = Column::from_qualified_name("l.c0"); - let right_join_key = Column::from_qualified_name("r.c0"); - - for predicate in predicates { - let column_refs = predicate.column_refs(); - assert!( - column_refs.contains(&&left_join_key) - || column_refs.contains(&&right_join_key), - "benchmark predicate should reference a join key: {predicate}" - ); - assert!( - column_refs - .iter() - .any(|col| **col != left_join_key && **col != right_join_key), - "benchmark predicate should reference a non-join column: {predicate}" - ); - } -} - -fn include_full_push_down_filter_sweep() -> bool { - env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") - .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) - .unwrap_or(false) -} - -fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { - if include_full_push_down_filter_sweep() { - FULL_DEPTH_SWEEP - .into_iter() - .flat_map(|depth| { - FULL_PREDICATE_SWEEP - .into_iter() - .map(move |predicate_count| (predicate_count, depth)) - }) - .collect() - } else { - DEFAULT_SWEEP_POINTS.to_vec() - } -} - -fn bench_push_down_filter_ab( - group: &mut BenchmarkGroup<'_, WallTime>, - rt: &Runtime, - sweep_points: &[(usize, usize)], - build_df: BuildFn, -) where - BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame, -{ - for &(predicate_count, depth) in sweep_points { - let with_push_down_filter = build_df(rt, predicate_count, depth, true); - let without_push_down_filter = build_df(rt, predicate_count, depth, false); - - let input_label = format!("predicates={predicate_count},nesting_depth={depth}"); - - group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }, - ); - - group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }, - ); - } -} - fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -480,40 +349,116 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - let sweep_points = push_down_filter_sweep_points(); + let predicate_sweep = [10, 20, 30, 40, 60]; + let case_depth_sweep = [1, 2, 3]; let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - bench_push_down_filter_ab( - &mut hotspot_group, - &rt, - &sweep_points, - |rt, predicate_count, depth, enable| { - build_case_heavy_left_join_df_with_push_down_filter( - rt, - predicate_count, - depth, - enable, - ) - }, - ); + for case_depth in case_depth_sweep { + for predicate_count in predicate_sweep { + let with_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + true, + ); + let without_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + false, + ); + + let input_label = + format!("predicates={predicate_count},case_depth={case_depth}"); + // A/B interpretation: + // - with_push_down_filter: default optimizer path (rule enabled) + // - without_push_down_filter: control path with the rule removed + // Compare both IDs at the same sweep point to isolate rule impact. + hotspot_group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); + }) + }, + ); + hotspot_group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); + }) + }, + ); + } + } hotspot_group.finish(); let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - bench_push_down_filter_ab( - &mut control_group, - &rt, - &sweep_points, - |rt, predicate_count, depth, enable| { - build_non_case_left_join_df_with_push_down_filter( - rt, + for nesting_depth in case_depth_sweep { + for predicate_count in predicate_sweep { + let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( + &rt, predicate_count, - depth, - enable, - ) - }, - ); + nesting_depth, + true, + ); + let without_push_down_filter = + build_non_case_left_join_df_with_push_down_filter( + &rt, + predicate_count, + nesting_depth, + false, + ); + + let input_label = + format!("predicates={predicate_count},nesting_depth={nesting_depth}"); + control_group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); + }) + }, + ); + control_group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); + }) + }, + ); + } + } control_group.finish(); } From 55d159aaccbf19fb7aa20948652d997f31f5f774 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 31 Mar 2026 18:30:11 +0800 Subject: [PATCH 08/10] fix: simplify predicate extraction in SQL planner - Updated `find_filter_predicates` function to streamline the code by removing unnecessary line breaks and retaining clarity in the error message when the expected structure is not met. - Ensured that the function continues to accurately identify and handle logical plans with projections. --- .../core/benches/sql_planner_extended.rs | 319 ++++++++++-------- 1 file changed, 187 insertions(+), 132 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index d4955313c79c3..7db50b19bf566 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -18,21 +18,31 @@ use arrow::array::{ArrayRef, RecordBatch}; use arrow_schema::DataType; use arrow_schema::TimeUnit::Nanosecond; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, + measurement::WallTime, +}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; -use datafusion_common::ScalarValue; +use datafusion_common::{Column, ScalarValue}; use datafusion_expr::Expr::Literal; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::utils::split_conjunction_owned; use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; +use std::env; use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; +const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; +const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; +const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; + // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns // with a variety of operations in it. @@ -218,7 +228,9 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { register_string_table(ctx, 100, 1000); let query = build_case_heavy_left_join_query(30, 1); - rt.block_on(async { ctx.sql(&query).await.unwrap() }) + let df = rt.block_on(async { ctx.sql(&query).await.unwrap() }); + assert_case_heavy_left_join_inference_candidates(&df, 30); + df } fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String { @@ -237,12 +249,17 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query.push_str(" AND "); } - let mut expr = format!("length(l.c{})", i % 20); + let left_payload_col = (i % 19) + 1; + let right_payload_col = ((i + 7) % 19) + 1; + let mut expr = format!( + "CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END" + ); for depth in 0..case_depth { - let left_col = (i + depth + 1) % 20; - let right_col = (i + depth + 2) % 20; + let left_col = ((i + depth + 3) % 19) + 1; + let right_col = ((i + depth + 11) % 19) + 1; + let join_key_ref = if (i + depth) % 2 == 0 { "l.c0" } else { "r.c0" }; expr = format!( - "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END" + "CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END" ); } @@ -252,26 +269,6 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - query } -fn build_case_heavy_left_join_df_with_push_down_filter( - rt: &Runtime, - predicate_count: usize, - case_depth: usize, - push_down_filter_enabled: bool, -) -> DataFrame { - let ctx = SessionContext::new(); - register_string_table(&ctx, 100, 1000); - if !push_down_filter_enabled { - let removed = ctx.remove_optimizer_rule("push_down_filter"); - assert!( - removed, - "push_down_filter rule should be present in the default optimizer" - ); - } - - let query = build_case_heavy_left_join_query(predicate_count, case_depth); - rt.block_on(async { ctx.sql(&query).await.unwrap() }) -} - fn build_non_case_left_join_query( predicate_count: usize, nesting_depth: usize, @@ -304,10 +301,11 @@ fn build_non_case_left_join_query( query } -fn build_non_case_left_join_df_with_push_down_filter( +fn build_left_join_df_with_push_down_filter( rt: &Runtime, + query_builder: impl Fn(usize, usize) -> String, predicate_count: usize, - nesting_depth: usize, + depth: usize, push_down_filter_enabled: bool, ) -> DataFrame { let ctx = SessionContext::new(); @@ -320,10 +318,143 @@ fn build_non_case_left_join_df_with_push_down_filter( ); } - let query = build_non_case_left_join_query(predicate_count, nesting_depth); + let query = query_builder(predicate_count, depth); rt.block_on(async { ctx.sql(&query).await.unwrap() }) } +fn build_case_heavy_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + case_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + let df = build_left_join_df_with_push_down_filter( + rt, + build_case_heavy_left_join_query, + predicate_count, + case_depth, + push_down_filter_enabled, + ); + assert_case_heavy_left_join_inference_candidates(&df, predicate_count); + df +} + +fn build_non_case_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + nesting_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + build_left_join_df_with_push_down_filter( + rt, + build_non_case_left_join_query, + predicate_count, + nesting_depth, + push_down_filter_enabled, + ) +} + +fn find_filter_predicates(plan: &LogicalPlan) -> Vec { + match plan { + LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()), + LogicalPlan::Projection(projection) => { + find_filter_predicates(projection.input.as_ref()) + } + other => { + panic!("expected benchmark query plan to contain a Filter, found {other:?}") + } + } +} + +fn assert_case_heavy_left_join_inference_candidates( + df: &DataFrame, + expected_predicate_count: usize, +) { + let predicates = find_filter_predicates(df.logical_plan()); + assert_eq!(predicates.len(), expected_predicate_count); + + let left_join_key = Column::from_qualified_name("l.c0"); + let right_join_key = Column::from_qualified_name("r.c0"); + + for predicate in predicates { + let column_refs = predicate.column_refs(); + assert!( + column_refs.contains(&&left_join_key) + || column_refs.contains(&&right_join_key), + "benchmark predicate should reference a join key: {predicate}" + ); + assert!( + column_refs + .iter() + .any(|col| **col != left_join_key && **col != right_join_key), + "benchmark predicate should reference a non-join column: {predicate}" + ); + } +} + +fn include_full_push_down_filter_sweep() -> bool { + env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") + .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { + if include_full_push_down_filter_sweep() { + FULL_DEPTH_SWEEP + .into_iter() + .flat_map(|depth| { + FULL_PREDICATE_SWEEP + .into_iter() + .map(move |predicate_count| (predicate_count, depth)) + }) + .collect() + } else { + DEFAULT_SWEEP_POINTS.to_vec() + } +} + +fn bench_push_down_filter_ab( + group: &mut BenchmarkGroup<'_, WallTime>, + rt: &Runtime, + sweep_points: &[(usize, usize)], + build_df: BuildFn, +) where + BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame, +{ + for &(predicate_count, depth) in sweep_points { + let with_push_down_filter = build_df(rt, predicate_count, depth, true); + let without_push_down_filter = build_df(rt, predicate_count, depth, false); + + let input_label = format!("predicates={predicate_count},nesting_depth={depth}"); + + group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + } +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -349,116 +480,40 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - let predicate_sweep = [10, 20, 30, 40, 60]; - let case_depth_sweep = [1, 2, 3]; + let sweep_points = push_down_filter_sweep_points(); let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - for case_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - true, - ); - let without_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},case_depth={case_depth}"); - // A/B interpretation: - // - with_push_down_filter: default optimizer path (rule enabled) - // - without_push_down_filter: control path with the rule removed - // Compare both IDs at the same sweep point to isolate rule impact. - hotspot_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - hotspot_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - } - } + bench_push_down_filter_ab( + &mut hotspot_group, + &rt, + &sweep_points, + |rt, predicate_count, depth, enable| { + build_case_heavy_left_join_df_with_push_down_filter( + rt, + predicate_count, + depth, + enable, + ) + }, + ); hotspot_group.finish(); let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - for nesting_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( - &rt, + bench_push_down_filter_ab( + &mut control_group, + &rt, + &sweep_points, + |rt, predicate_count, depth, enable| { + build_non_case_left_join_df_with_push_down_filter( + rt, predicate_count, - nesting_depth, - true, - ); - let without_push_down_filter = - build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},nesting_depth={nesting_depth}"); - control_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - control_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - } - } + depth, + enable, + ) + }, + ); control_group.finish(); } From e2af6276e128d8ed2d2d08922daecc43a8ad2913 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 1 Apr 2026 13:28:52 +0800 Subject: [PATCH 09/10] feat(benchmarks): enhance left join benchmark with push down filter tests - Refactored case heavy and non-case left join benchmark functions to include push down filter tests. - Added utility functions to configure benchmark sweeps for push down filters, making it customizable via environment variables. - Improved assertions for filter predicates in case heavy left join inference. - Cleaned up and organized existing benchmark code for clarity and reuse. --- datafusion/core/benches/sql_planner_extended.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 7db50b19bf566..767134bb5bafd 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -357,12 +357,8 @@ fn build_non_case_left_join_df_with_push_down_filter( fn find_filter_predicates(plan: &LogicalPlan) -> Vec { match plan { LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()), - LogicalPlan::Projection(projection) => { - find_filter_predicates(projection.input.as_ref()) - } - other => { - panic!("expected benchmark query plan to contain a Filter, found {other:?}") - } + LogicalPlan::Projection(projection) => find_filter_predicates(projection.input.as_ref()), + other => panic!("expected benchmark query plan to contain a Filter, found {other:?}"), } } @@ -379,8 +375,7 @@ fn assert_case_heavy_left_join_inference_candidates( for predicate in predicates { let column_refs = predicate.column_refs(); assert!( - column_refs.contains(&&left_join_key) - || column_refs.contains(&&right_join_key), + column_refs.contains(&&left_join_key) || column_refs.contains(&&right_join_key), "benchmark predicate should reference a join key: {predicate}" ); assert!( From 5ce2ee191c2efa59effd366cab194d4662f2fb4a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 1 Apr 2026 13:33:49 +0800 Subject: [PATCH 10/10] cargo fmt --- datafusion/core/benches/sql_planner_extended.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 767134bb5bafd..7db50b19bf566 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -357,8 +357,12 @@ fn build_non_case_left_join_df_with_push_down_filter( fn find_filter_predicates(plan: &LogicalPlan) -> Vec { match plan { LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()), - LogicalPlan::Projection(projection) => find_filter_predicates(projection.input.as_ref()), - other => panic!("expected benchmark query plan to contain a Filter, found {other:?}"), + LogicalPlan::Projection(projection) => { + find_filter_predicates(projection.input.as_ref()) + } + other => { + panic!("expected benchmark query plan to contain a Filter, found {other:?}") + } } } @@ -375,7 +379,8 @@ fn assert_case_heavy_left_join_inference_candidates( for predicate in predicates { let column_refs = predicate.column_refs(); assert!( - column_refs.contains(&&left_join_key) || column_refs.contains(&&right_join_key), + column_refs.contains(&&left_join_key) + || column_refs.contains(&&right_join_key), "benchmark predicate should reference a join key: {predicate}" ); assert!(