feat: adaptive filter selectivity tracking for Parquet row filters#19639
feat: adaptive filter selectivity tracking for Parquet row filters#19639adriangb wants to merge 2 commits intoapache:mainfrom
Conversation
|
run benchmarks |
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#19639 (comment)).
|
|
🤖: Benchmark completed Details
|
|
run benchmarks tpch |
|
🤖 Hi @adriangb, thanks for the request (#19639 (comment)).
Please choose one or more of these with |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
This is probably not a good issue to pick up. This is a draft PR for an unproven idea. |
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpch |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Did you find any evidence that the selectivity of predicates changes over the course of the query (or put another way that reordering them during execution would help?) |
One case where that for sure happens is dynamic filters 😉 (although we treat each version of it as a different filter, the point is that we need to be dynamic about new filters showing up mid query). But I think it's also not unusual to have unevenly distributed data across files. I do agree that if we change how we apply a filter between files that probably captures 95% of the benefit (as opposed to within a scan of a single file). But the main point is that we start from "we know nothing" which we're treating as "nothing is selective" and then once we know a filter is selective enough we move it over to be a row filter. |
|
Hey @adriangb, I've been thinking about something like this since the New Year. It's really cool to see you putting together a draft for it. I haven't had a chance to give a full go at your code, but I wanted to share some research I've done earlier that might be relevant:
Before seeing your PR and comments in #3463 I was thinking about using more simple heuristics for sorting predicates.
From a quick skim of the clickhouse original PR, they still rely on some simple heuristics when columns statistics aren't available. I would like to give your PR a proper review once I'm home, but I already love the direction you're taking. |
|
Seeing the results in #19694 (comment) I think something like this is needed. However I also see some regressions there that are related to the execution order of the filter expressions (such as regex / string matching functions coming earlier now). I guess we also need some adaptiveness for that (e.g. based on measured time or some static heuristic). |
|
Perhaps that should also help with other expensive expressions like for join filter pushdown - we materialize columns using cheap/selective filters first and then expensive ones like |
|
Yeah makes sense to me 👍🏻. But if we can't show via benchmarks this is an immediate win... can we justify moving forward with it? |
|
In #19694 (comment) it seems this adaptivity is better than main (almost no regression on tpch and clickbench) when also setting |
c0b86d7 to
b13bd64
Compare
|
run benchmark tpcds |
|
run benchmark clickbench |
|
🤖 Hi @adriangb, thanks for the request (#19639 (comment)).
Please choose one or more of these with You can also set environment variables on subsequent lines: Unsupported benchmarks: clickbench. |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
🤖 |
|
run benchmark tpcds |
|
@Dandandan @alamb these numbers are looking pretty good |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
| /// promoted. A value of 0.0 means all filters will be promoted. | ||
| /// Because there can be a high I/O cost to pushing down ineffective filters, | ||
| /// recommended values are in the range [0.8, 0.95], depending on random I/0 costs. | ||
| pub filter_effectiveness_threshold: f64, default = 0.8 |
There was a problem hiding this comment.
Can we check 0.5 as well here?
|
Is it me or don't I see really large difference in performamce (I would have expected more). |
| } | ||
|
|
||
| if reorder_predicates { | ||
| candidates_with_exprs.sort_unstable_by(|(_, c1), (_, c2)| { |
There was a problem hiding this comment.
AFAIK build_row_filter_with_metrics only runs once on file open?
Which means for all files/partitions that are opened directly on start it will not do anything.
(E.g. for TPCH / TPCDS this is not helping much as the number of files is limited, so it will only help if partitions are started one after another).
clickbench_partitioned consists of 100 files - so it might help there more.
Summary
This PR implements cross-file tracking of filter selectivity in ParquetSource to adaptively reorder and demote low-selectivity filters, as discussed in #3463 (comment).
Key changes:
SelectivityTrackerto track filter effectiveness across files usingExprKeywrapper for structural equalityParquetOpenerqueries shared stats to partition filters into row filters (push down) vs post-scan filters (inline application)apply_post_scan_filters(), then filter columns are removed from outputSelectivityUpdatingStreamwrapper updates tracker when stream completesbuild_row_filter_with_metrics()returns per-filter metrics for selectivity trackingConfiguration:
parquet_options.filter_effectiveness_threshold(default: 0.8)Files added:
datafusion/datasource-parquet/src/selectivity.rs- Core tracking infrastructureFiles modified:
opener.rs- Filter partitioning, post-scan application,SelectivityUpdatingStreamrow_filter.rs-FilterMetrics,RowFilterWithMetrics, effectiveness-based reorderingsource.rs-selectivity_trackerfield and builder methodsconfig.rs-filter_effectiveness_thresholdconfig optionTest plan
ExprKeyhash/eq consistencySelectivityStats::effectiveness()edge casesSelectivityTracker::partition_filters()threshold logic🤖 Generated with Claude Code