Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
b9828ca
fix: optimize null predicate evaluation by early exit for non-restric…
kosiew Mar 16, 2026
88e4455
Add test case for a > b in join key predicates
kosiew Mar 16, 2026
df73001
Refactor column membership check into a helper function
kosiew Mar 16, 2026
144cab3
Clarify null-restricting behavior in filter check
kosiew Mar 16, 2026
9ef45f0
Simplify column check and null-restrict handling
kosiew Mar 16, 2026
3d3945c
refactor: streamline null predicate evaluation by introducing predica…
kosiew Mar 16, 2026
17009ae
Improve SQL boolean/null semantics handling
kosiew Mar 18, 2026
515da96
Add regression tests for SQL shape coverage
kosiew Mar 19, 2026
115b9f2
Refactor null predicate evaluation and add tests
kosiew Mar 19, 2026
38680f5
Refactor null handling in expression evaluation
kosiew Mar 19, 2026
ebd70ef
Test window scalar subquery optimizer delta
kosiew Mar 19, 2026
9665c63
Avoid join filter rewrite for scalar subqueries
kosiew Mar 19, 2026
00dcba4
new
kosiew Mar 19, 2026
12de382
Simplify null handling and filter push down logic
kosiew Mar 19, 2026
4fdeb7c
clippy fix
kosiew Mar 19, 2026
1d12172
Amend benchmark
kosiew Mar 20, 2026
2c5096d
Fix alias for scalar aggregate in push_down_filter
kosiew Mar 20, 2026
718cdf6
Refactor optimizer: remove test-only controls
kosiew Mar 20, 2026
deb6799
Add domain-specific evaluator for null restrictions
kosiew Mar 20, 2026
aa030ad
Improve null_restriction with documentation and evaluator
kosiew Mar 20, 2026
9cae403
Rename and clarify filter promotion function
kosiew Mar 20, 2026
11098c1
Refactor pushdown filter logic for clarity
kosiew Mar 20, 2026
84ca1b9
Add null restriction eval mode and test controls
kosiew Mar 20, 2026
fae294d
Refactor SQL planner for left join with filters
kosiew Mar 20, 2026
0624204
Refactor null restriction logic in null_restriction.rs
kosiew Mar 20, 2026
91acb3f
Make NullRestrictionEvalMode test-only
kosiew Mar 20, 2026
6cce56a
amend benchmark
kosiew Mar 20, 2026
2832040
Optimize join predicate handling and add regression test
kosiew Mar 27, 2026
e09533f
Optimize candidate-building flow in join predicates
kosiew Mar 27, 2026
69864d6
Enhance join query to generate complex CASE predicates
kosiew Mar 27, 2026
23e38a1
Add guard helper for CASE-heavy left join inference
kosiew Mar 27, 2026
3e65e22
Refactor promotion filter for join conditions
kosiew Mar 27, 2026
1b89409
Refine returns_exactly_one_row helper behavior
kosiew Mar 27, 2026
0a1feaa
Allow post-join filter promotion based on conditions
kosiew Mar 27, 2026
52713fa
Add support for limited row cross joins
kosiew Mar 27, 2026
4f90984
Add test for exact one-row subquery promotion
kosiew Mar 27, 2026
33fcb56
Remove unrelated join-promotion behavior from PushDownFilter
kosiew Mar 27, 2026
4215b12
Add optimizer-level test for scalar subquery filter
kosiew Mar 27, 2026
526d346
Add focused tests for cross join and scalar subquery
kosiew Mar 27, 2026
ac59a56
Prevent join-condition promotion in specific cases
kosiew Mar 27, 2026
c09154f
Add fast path for mixed-reference predicates in authoritative mode
kosiew Mar 27, 2026
b1d6cba
Enhance null-restriction eval mode for safety
kosiew Mar 27, 2026
ba03e05
Rename helper for scalar subquery clarity
kosiew Mar 27, 2026
2f658b4
Add maintainer comment in null_restriction.rs
kosiew Mar 27, 2026
e4ffcd1
Reduce plan-shape sensitivity for scalar-subquery guard
kosiew Mar 27, 2026
a817b7d
Consolidate plan-wrapper traversal and optimizations
kosiew Mar 27, 2026
4c143ad
Separate semantics for evaluates_to_null and
kosiew Mar 27, 2026
18b7541
cargo fmt
kosiew Mar 27, 2026
106e963
Refactor join input handling and null evaluation
kosiew Mar 27, 2026
5effa4c
Refactor push_down_filter for simplicity and efficiency
kosiew Mar 27, 2026
0563f6c
Refactor utility layers and improve code compactness
kosiew Mar 27, 2026
8e1a2f2
Refactor utility layers and simplify null handling
kosiew Mar 27, 2026
a3bcb57
Revert to 787d0a4d3
kosiew Mar 27, 2026
37bcc07
Refactor tests for clarity and reusability
kosiew Mar 27, 2026
c936b6a
Refactor is_restrict_null_predicate in utils.rs
kosiew Mar 27, 2026
bacb0e3
benchmark 7 improvements, 2 regressions
kosiew Mar 28, 2026
847a8fa
Restore syntactic null-restriction fast path
kosiew Mar 28, 2026
6ef1c84
benchmark 13 improvements
kosiew Mar 30, 2026
5dd668a
Fix metric type casing in BaselineMetrics
kosiew Mar 30, 2026
80750ea
Fix mixed boolean handling in null_restriction
kosiew Mar 30, 2026
f8204b6
Fix metric type casing in BaselineMetrics
kosiew Mar 30, 2026
4208dd5
Merge branch 'main' into push-down-02-20002
kosiew Mar 31, 2026
bf36fac
formatting
kosiew Mar 31, 2026
1f2b598
refactor: improve join condition evaluation and filter management
kosiew Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 182 additions & 132 deletions datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,31 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use criterion::{
BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main,
measurement::WallTime,
};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::Expr::Literal;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::env;
use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
use tokio::runtime::Runtime;

const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60];
const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3];
const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)];

// This benchmark suite is designed to test the performance of
// logical planning with a large plan containing unions, many columns
// with a variety of operations in it.
Expand Down Expand Up @@ -218,7 +228,9 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
register_string_table(ctx, 100, 1000);
let query = build_case_heavy_left_join_query(30, 1);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
let df = rt.block_on(async { ctx.sql(&query).await.unwrap() });
assert_case_heavy_left_join_inference_candidates(&df, 30);
df
}

fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String {
Expand All @@ -237,12 +249,17 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
query.push_str(" AND ");
}

let mut expr = format!("length(l.c{})", i % 20);
let left_payload_col = (i % 19) + 1;
let right_payload_col = ((i + 7) % 19) + 1;
let mut expr = format!(
"CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END"
);
for depth in 0..case_depth {
let left_col = (i + depth + 1) % 20;
let right_col = (i + depth + 2) % 20;
let left_col = ((i + depth + 3) % 19) + 1;
let right_col = ((i + depth + 11) % 19) + 1;
let join_key_ref = if (i + depth) % 2 == 0 { "l.c0" } else { "r.c0" };
expr = format!(
"CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END"
"CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END"
);
}

Expand All @@ -252,26 +269,6 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
query
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
register_string_table(&ctx, 100, 1000);
if !push_down_filter_enabled {
let removed = ctx.remove_optimizer_rule("push_down_filter");
assert!(
removed,
"push_down_filter rule should be present in the default optimizer"
);
}

let query = build_case_heavy_left_join_query(predicate_count, case_depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_non_case_left_join_query(
predicate_count: usize,
nesting_depth: usize,
Expand Down Expand Up @@ -304,10 +301,11 @@ fn build_non_case_left_join_query(
query
}

fn build_non_case_left_join_df_with_push_down_filter(
fn build_left_join_df_with_push_down_filter(
rt: &Runtime,
query_builder: impl Fn(usize, usize) -> String,
predicate_count: usize,
nesting_depth: usize,
depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
Expand All @@ -320,10 +318,138 @@ fn build_non_case_left_join_df_with_push_down_filter(
);
}

let query = build_non_case_left_join_query(predicate_count, nesting_depth);
let query = query_builder(predicate_count, depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let df = build_left_join_df_with_push_down_filter(
rt,
build_case_heavy_left_join_query,
predicate_count,
case_depth,
push_down_filter_enabled,
);
assert_case_heavy_left_join_inference_candidates(&df, predicate_count);
df
}

fn build_non_case_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
nesting_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
build_left_join_df_with_push_down_filter(
rt,
build_non_case_left_join_query,
predicate_count,
nesting_depth,
push_down_filter_enabled,
)
}

fn find_filter_predicates(plan: &LogicalPlan) -> Vec<datafusion_expr::Expr> {
match plan {
LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()),
LogicalPlan::Projection(projection) => find_filter_predicates(projection.input.as_ref()),
other => panic!("expected benchmark query plan to contain a Filter, found {other:?}"),
}
}

fn assert_case_heavy_left_join_inference_candidates(
df: &DataFrame,
expected_predicate_count: usize,
) {
let predicates = find_filter_predicates(df.logical_plan());
assert_eq!(predicates.len(), expected_predicate_count);

let left_join_key = Column::from_qualified_name("l.c0");
let right_join_key = Column::from_qualified_name("r.c0");

for predicate in predicates {
let column_refs = predicate.column_refs();
assert!(
column_refs.contains(&&left_join_key) || column_refs.contains(&&right_join_key),
"benchmark predicate should reference a join key: {predicate}"
);
assert!(
column_refs
.iter()
.any(|col| **col != left_join_key && **col != right_join_key),
"benchmark predicate should reference a non-join column: {predicate}"
);
}
}

fn include_full_push_down_filter_sweep() -> bool {
env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP")
.map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

fn push_down_filter_sweep_points() -> Vec<(usize, usize)> {
if include_full_push_down_filter_sweep() {
FULL_DEPTH_SWEEP
.into_iter()
.flat_map(|depth| {
FULL_PREDICATE_SWEEP
.into_iter()
.map(move |predicate_count| (predicate_count, depth))
})
.collect()
} else {
DEFAULT_SWEEP_POINTS.to_vec()
}
}

fn bench_push_down_filter_ab<BuildFn>(
group: &mut BenchmarkGroup<'_, WallTime>,
rt: &Runtime,
sweep_points: &[(usize, usize)],
build_df: BuildFn,
) where
BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame,
{
for &(predicate_count, depth) in sweep_points {
let with_push_down_filter = build_df(rt, predicate_count, depth, true);
let without_push_down_filter = build_df(rt, predicate_count, depth, false);

let input_label = format!("predicates={predicate_count},nesting_depth={depth}");

group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);

group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
}
}

fn criterion_benchmark(c: &mut Criterion) {
let baseline_ctx = SessionContext::new();
let case_heavy_ctx = SessionContext::new();
Expand All @@ -349,116 +475,40 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let predicate_sweep = [10, 20, 30, 40, 60];
let case_depth_sweep = [1, 2, 3];
let sweep_points = push_down_filter_sweep_points();

let mut hotspot_group =
c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
for case_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);

let input_label =
format!("predicates={predicate_count},case_depth={case_depth}");
// A/B interpretation:
// - with_push_down_filter: default optimizer path (rule enabled)
// - without_push_down_filter: control path with the rule removed
// Compare both IDs at the same sweep point to isolate rule impact.
hotspot_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
hotspot_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
bench_push_down_filter_ab(
&mut hotspot_group,
&rt,
&sweep_points,
|rt, predicate_count, depth, enable| {
build_case_heavy_left_join_df_with_push_down_filter(
rt,
predicate_count,
depth,
enable,
)
},
);
hotspot_group.finish();

let mut control_group =
c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
for nesting_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
bench_push_down_filter_ab(
&mut control_group,
&rt,
&sweep_points,
|rt, predicate_count, depth, enable| {
build_non_case_left_join_df_with_push_down_filter(
rt,
predicate_count,
nesting_depth,
true,
);
let without_push_down_filter =
build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
false,
);

let input_label =
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
control_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
control_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
depth,
enable,
)
},
);
control_group.finish();
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub mod create_drop;
pub mod explain_analyze;
pub mod joins;
mod path_partition;
mod push_down_filter_regressions;
mod runtime_config;
pub mod select;
mod sql_api;
Expand Down
Loading
Loading