feat: Add selectivity-tracking wrapper for dynamic filters#20160
feat: Add selectivity-tracking wrapper for dynamic filters#20160adriangb wants to merge 14 commits intoapache:mainfrom
Conversation
8ccef6c to
9a1ebd9
Compare
datafusion/common/src/config.rs
Outdated
| /// Enable selectivity-based disabling of dynamic filters from joins. | ||
| /// | ||
| /// When enabled, join dynamic filters that pass most rows (above the threshold) | ||
| /// will be automatically disabled to avoid evaluation overhead. This is useful | ||
| /// when the build side of a join covers most of the probe side values, making | ||
| /// the filter expensive to evaluate for little benefit. | ||
| /// | ||
| /// The selectivity tracking resets when the dynamic filter is updated (e.g., when | ||
| /// the hash table is built), allowing the filter to be re-evaluated with new data. | ||
| pub enable_dynamic_filter_selectivity_tracking: bool, default = false | ||
|
|
||
| /// Selectivity threshold for disabling join dynamic filters. | ||
| /// | ||
| /// If the filter passes this fraction or more of rows, it will be disabled. | ||
| /// Value should be between 0.0 and 1.0. | ||
| /// | ||
| /// For example, 0.95 means if 95% or more of rows pass the filter, it will be disabled. | ||
| /// Only used when `enable_dynamic_filter_selectivity_tracking` is true. | ||
| pub dynamic_filter_selectivity_threshold: f64, default = 0.95 | ||
|
|
||
| /// Minimum number of rows to process before making a selectivity decision | ||
| /// for join dynamic filters. | ||
| /// | ||
| /// The filter will remain in a tracking state until this many rows have been | ||
| /// processed. This ensures statistical stability before making the disable decision. | ||
| /// Only used when `enable_dynamic_filter_selectivity_tracking` is true. | ||
| pub dynamic_filter_min_rows_for_selectivity: usize, default = 10_000 |
There was a problem hiding this comment.
Not sure we need all of these, or at least not sure they should be prefixed with dynamic_filter
| pub struct SelectivityConfig { | ||
| /// Threshold above which the filter is disabled (e.g., 0.95 = 95% selectivity). | ||
| /// If the filter passes this fraction or more of rows, it will be disabled. | ||
| pub threshold: f64, |
There was a problem hiding this comment.
Could be in GB/s? Rows/s?
| /// If the filter passes this fraction or more of rows, it will be disabled. | ||
| pub threshold: f64, | ||
| /// Minimum rows to process before making a selectivity decision. | ||
| pub min_rows: usize, |
There was a problem hiding this comment.
Could be in GB? Number of batches? Time? (or all of the above?)
|
run benchmark tpchds tpch |
|
run benchmark tpchds tpch tpch10 |
|
run benchmarks |
|
show benchmark queue |
|
run benchmark tpcds |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
I expect benchmarks to look bad - there's overhead this wrapper introduces right now even in no-op mode. I'm going to push a fix and run benches again. |
|
run benchmark tpch |
|
🤖 |
|
run benchmark tpcds |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpcds |
|
🤖 |
|
🤖: Benchmark completed Details
|
The snapshot() method was returning None, causing the wrapper to be preserved during snapshotting. Since PruningPredicate doesn't recognize the wrapper type, it fell back to lit(true) which disabled all row group and file pruning. Changed snapshot() to return the inner expression directly, stripping the wrapper during snapshotting so pruning predicates work correctly. Also set enable_adaptive_filter_selectivity_tracking default to false since this is an experimental feature. Benchmarks show no slowdowns after this fix. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Simplifies the implementation by removing generation-aware reset logic: - No longer tracks inner filter's generation - snapshot_generation() returns inner's generation, or 0 when disabled - Faster evaluate() path without generation checks The generation tracking was unnecessary for hash join filters and added overhead to the hot path. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
22439b2 to
4124aae
Compare
… tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
run benchmark tpcds |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
@Dandandan numbers looking pretty good here! |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmarks |
|
run benchmarks |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
So an improvement for 2 queries, but still most queries are slower than without pushdown. |
|
This is the main improvement. |
Ok - yes I see some improvements here and there but it is still largely regressing main with ~30s (TPCDS runs in ~50s without and ~80s with filter pushdown). See e.g. this run #20318 (comment) against main without dynamic filter pushdown. This ~26x regression (and many others) is still unchanged in this PR: As we're running with I think I now have an understanding why the current approaches adaptiveness isn't helping that much yet. I think for it to work effectively, it needs to integrate more with the parquet reader to remove or add a filter based on the adaptiveness during the scan. |
Summary
Add
SelectivityAwareFilterExpr, a wrapperPhysicalExprthat tracks filter selectivity at runtime and automatically disables filters that aren't pruning enough rows. This addresses the issue where dynamic filters fromHashJoinExeccan be expensive to evaluate for little benefit when the build side covers most of the probe side values.Key Features
rows_passed / rows_total >= thresholdmin_rowsbefore making a decisionNew Configuration Options
Added to
OptimizerOptions:enable_dynamic_filter_selectivity_tracking(default:false)dynamic_filter_selectivity_threshold(default:0.95)dynamic_filter_min_rows_for_selectivity(default:10000)Files Changed
datafusion/physical-expr/src/expressions/selectivity_aware_filter.rsdatafusion/physical-expr/src/expressions/mod.rsdatafusion/common/src/config.rsOptimizerOptionsdatafusion/physical-plan/src/joins/hash_join/exec.rsdatafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.sltTest plan
SelectivityAwareFilterExpr(6 tests)🤖 Generated with Claude Code