Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 187 additions & 132 deletions datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,31 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use criterion::{
BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main,
measurement::WallTime,
};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::Expr::Literal;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::env;
use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
use tokio::runtime::Runtime;

const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60];
const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3];
const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)];

// This benchmark suite is designed to test the performance of
// logical planning with a large plan containing unions, many columns
// with a variety of operations in it.
Expand Down Expand Up @@ -218,7 +228,9 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
register_string_table(ctx, 100, 1000);
let query = build_case_heavy_left_join_query(30, 1);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
let df = rt.block_on(async { ctx.sql(&query).await.unwrap() });
assert_case_heavy_left_join_inference_candidates(&df, 30);
df
}

fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String {
Expand All @@ -237,12 +249,17 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
query.push_str(" AND ");
}

let mut expr = format!("length(l.c{})", i % 20);
let left_payload_col = (i % 19) + 1;
let right_payload_col = ((i + 7) % 19) + 1;
let mut expr = format!(
"CASE WHEN l.c0 IS NOT NULL THEN length(l.c{left_payload_col}) ELSE length(r.c{right_payload_col}) END"
);
for depth in 0..case_depth {
let left_col = (i + depth + 1) % 20;
let right_col = (i + depth + 2) % 20;
let left_col = ((i + depth + 3) % 19) + 1;
let right_col = ((i + depth + 11) % 19) + 1;
let join_key_ref = if (i + depth) % 2 == 0 { "l.c0" } else { "r.c0" };
expr = format!(
"CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END"
"CASE WHEN {join_key_ref} IS NOT NULL THEN {expr} ELSE CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END END"
);
}

Expand All @@ -252,26 +269,6 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -
query
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
register_string_table(&ctx, 100, 1000);
if !push_down_filter_enabled {
let removed = ctx.remove_optimizer_rule("push_down_filter");
assert!(
removed,
"push_down_filter rule should be present in the default optimizer"
);
}

let query = build_case_heavy_left_join_query(predicate_count, case_depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_non_case_left_join_query(
predicate_count: usize,
nesting_depth: usize,
Expand Down Expand Up @@ -304,10 +301,11 @@ fn build_non_case_left_join_query(
query
}

fn build_non_case_left_join_df_with_push_down_filter(
fn build_left_join_df_with_push_down_filter(
rt: &Runtime,
query_builder: impl Fn(usize, usize) -> String,
predicate_count: usize,
nesting_depth: usize,
depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
Expand All @@ -320,10 +318,143 @@ fn build_non_case_left_join_df_with_push_down_filter(
);
}

let query = build_non_case_left_join_query(predicate_count, nesting_depth);
let query = query_builder(predicate_count, depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let df = build_left_join_df_with_push_down_filter(
rt,
build_case_heavy_left_join_query,
predicate_count,
case_depth,
push_down_filter_enabled,
);
assert_case_heavy_left_join_inference_candidates(&df, predicate_count);
df
}

fn build_non_case_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
nesting_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
build_left_join_df_with_push_down_filter(
rt,
build_non_case_left_join_query,
predicate_count,
nesting_depth,
push_down_filter_enabled,
)
}

fn find_filter_predicates(plan: &LogicalPlan) -> Vec<datafusion_expr::Expr> {
match plan {
LogicalPlan::Filter(filter) => split_conjunction_owned(filter.predicate.clone()),
LogicalPlan::Projection(projection) => {
find_filter_predicates(projection.input.as_ref())
}
other => {
panic!("expected benchmark query plan to contain a Filter, found {other:?}")
}
}
}

fn assert_case_heavy_left_join_inference_candidates(
df: &DataFrame,
expected_predicate_count: usize,
) {
let predicates = find_filter_predicates(df.logical_plan());
assert_eq!(predicates.len(), expected_predicate_count);

let left_join_key = Column::from_qualified_name("l.c0");
let right_join_key = Column::from_qualified_name("r.c0");

for predicate in predicates {
let column_refs = predicate.column_refs();
assert!(
column_refs.contains(&&left_join_key)
|| column_refs.contains(&&right_join_key),
"benchmark predicate should reference a join key: {predicate}"
);
assert!(
column_refs
.iter()
.any(|col| **col != left_join_key && **col != right_join_key),
"benchmark predicate should reference a non-join column: {predicate}"
);
}
}

fn include_full_push_down_filter_sweep() -> bool {
env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP")
.map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

fn push_down_filter_sweep_points() -> Vec<(usize, usize)> {
if include_full_push_down_filter_sweep() {
FULL_DEPTH_SWEEP
.into_iter()
.flat_map(|depth| {
FULL_PREDICATE_SWEEP
.into_iter()
.map(move |predicate_count| (predicate_count, depth))
})
.collect()
} else {
DEFAULT_SWEEP_POINTS.to_vec()
}
}

fn bench_push_down_filter_ab<BuildFn>(
group: &mut BenchmarkGroup<'_, WallTime>,
rt: &Runtime,
sweep_points: &[(usize, usize)],
build_df: BuildFn,
) where
BuildFn: Fn(&Runtime, usize, usize, bool) -> DataFrame,
{
for &(predicate_count, depth) in sweep_points {
let with_push_down_filter = build_df(rt, predicate_count, depth, true);
let without_push_down_filter = build_df(rt, predicate_count, depth, false);

let input_label = format!("predicates={predicate_count},nesting_depth={depth}");

group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);

group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
}
}

fn criterion_benchmark(c: &mut Criterion) {
let baseline_ctx = SessionContext::new();
let case_heavy_ctx = SessionContext::new();
Expand All @@ -349,116 +480,40 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let predicate_sweep = [10, 20, 30, 40, 60];
let case_depth_sweep = [1, 2, 3];
let sweep_points = push_down_filter_sweep_points();

let mut hotspot_group =
c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
for case_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);

let input_label =
format!("predicates={predicate_count},case_depth={case_depth}");
// A/B interpretation:
// - with_push_down_filter: default optimizer path (rule enabled)
// - without_push_down_filter: control path with the rule removed
// Compare both IDs at the same sweep point to isolate rule impact.
hotspot_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
hotspot_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
bench_push_down_filter_ab(
&mut hotspot_group,
&rt,
&sweep_points,
|rt, predicate_count, depth, enable| {
build_case_heavy_left_join_df_with_push_down_filter(
rt,
predicate_count,
depth,
enable,
)
},
);
hotspot_group.finish();

let mut control_group =
c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
for nesting_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
bench_push_down_filter_ab(
&mut control_group,
&rt,
&sweep_points,
|rt, predicate_count, depth, enable| {
build_non_case_left_join_df_with_push_down_filter(
rt,
predicate_count,
nesting_depth,
true,
);
let without_push_down_filter =
build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
false,
);

let input_label =
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
control_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
control_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
depth,
enable,
)
},
);
control_group.finish();
}

Expand Down
Loading