From b13bd645720a15f3d421fee10d12e5828b272278 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 8 Jan 2026 13:07:24 -0500 Subject: [PATCH 1/2] feat: adaptive filter selectivity tracking for Parquet row filters --- datafusion/common/src/config.rs | 10 + .../common/src/file_options/parquet_writer.rs | 4 + datafusion/core/src/dataframe/parquet.rs | 9 +- .../src/datasource/physical_plan/parquet.rs | 3 +- .../physical_optimizer/filter_pushdown.rs | 5 + datafusion/core/tests/sql/explain_analyze.rs | 2 +- .../benches/parquet_nested_filter_pushdown.rs | 4 +- .../datasource-parquet/src/file_format.rs | 7 + datafusion/datasource-parquet/src/metrics.rs | 7 + datafusion/datasource-parquet/src/mod.rs | 5 + datafusion/datasource-parquet/src/opener.rs | 368 ++++++++++++-- .../datasource-parquet/src/row_filter.rs | 283 ++++++++++- .../datasource-parquet/src/selectivity.rs | 455 ++++++++++++++++++ datafusion/datasource-parquet/src/source.rs | 46 ++ .../proto/datafusion_common.proto | 4 + datafusion/proto-common/src/from_proto/mod.rs | 3 + .../proto-common/src/generated/pbjson.rs | 22 + .../proto-common/src/generated/prost.rs | 9 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 9 + .../proto/src/logical_plan/file_formats.rs | 4 + datafusion/proto/src/physical_plan/mod.rs | 8 + .../test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 24 files changed, 1228 insertions(+), 43 deletions(-) create mode 100644 datafusion/datasource-parquet/src/selectivity.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0ba587bbc6961..9c335c9708a49 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -751,6 +751,16 @@ config_namespace! { /// parquet reader setting. 0 means no caching. pub max_predicate_cache_size: Option, default = None + /// (reading) Minimum filter effectiveness threshold for adaptive filter + /// pushdown. + /// Only filters that filter out at least this fraction of rows will be + /// promoted to row filters during adaptive filter pushdown. + /// A value of 1.0 means only filters that filter out all rows will be + /// promoted. A value of 0.0 means all filters will be promoted. + /// Because there can be a high I/O cost to pushing down ineffective filters, + /// recommended values are in the range [0.8, 0.95], depending on random I/0 costs. + pub filter_effectiveness_threshold: f64, default = 0.8 + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f6608d16c1022..0855c15a4c74d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + filter_effectiveness_threshold: _, // not used for writer props } = self; let mut builder = WriterProperties::builder() @@ -460,6 +461,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + filter_effectiveness_threshold: defaults.filter_effectiveness_threshold, } } @@ -574,6 +576,8 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + filter_effectiveness_threshold: global_options_defaults + .filter_effectiveness_threshold, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 54dadfd78cbc2..69156bf6756f2 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -152,7 +152,14 @@ mod tests { let plan = df.explain(false, false)?.collect().await?; // Filters all the way to Parquet let formatted = pretty::pretty_format_batches(&plan)?.to_string(); - assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}"); + let data_source_exec_row = formatted + .lines() + .find(|line| line.contains("DataSourceExec:")) + .unwrap(); + assert!( + data_source_exec_row.contains("predicate=id@0 = 1"), + "{formatted}" + ); Ok(()) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4c6d915d5bcaa..5992f158824c8 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -169,7 +169,8 @@ mod tests { if self.pushdown_predicate { source = source .with_pushdown_filters(true) - .with_reorder_filters(true); + .with_reorder_filters(true) + .with_filter_effectiveness_threshold(0.0); } else { source = source.with_pushdown_filters(false); } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index b3ed8d9653fe1..228897af2b2b0 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1731,6 +1731,11 @@ async fn test_topk_dynamic_filter_pushdown_integration() { let mut cfg = SessionConfig::new(); cfg.options_mut().execution.parquet.pushdown_filters = true; cfg.options_mut().execution.parquet.max_row_group_size = 128; + // Always pushdown filters into row filters for this test + cfg.options_mut() + .execution + .parquet + .filter_effectiveness_threshold = 0.0; let ctx = SessionContext::new_with_config(cfg); ctx.register_object_store( ObjectStoreUrl::parse("memory://").unwrap().as_ref(), diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 5f62f7204eff1..e7d5e28a44803 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -862,7 +862,7 @@ async fn parquet_explain_analyze() { .to_string(); // should contain aggregated stats - assert_contains!(&formatted, "output_rows=8"); + assert_contains!(&formatted, "output_rows=5"); assert_contains!( &formatted, "row_groups_pruned_bloom_filter=1 total \u{2192} 1 matched" diff --git a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs index ed92031f86c6b..4a1eff561c76e 100644 --- a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs +++ b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs @@ -24,7 +24,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use criterion::{Criterion, Throughput, criterion_group, criterion_main}; use datafusion_common::ScalarValue; -use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter}; +use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter, SelectivityTracker}; use datafusion_expr::{Expr, col}; use datafusion_functions_nested::expr_fn::array_has; use datafusion_physical_expr::planner::logical2physical; @@ -116,7 +116,7 @@ fn scan_with_predicate( let builder = if pushdown { if let Some(row_filter) = - build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)? + build_row_filter(predicate, file_schema, &metadata, false, &file_metrics, &SelectivityTracker::default())? { builder.with_row_filter(row_filter) } else { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d59b42ed15d15..485e6688fae7f 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -507,6 +507,12 @@ impl FileFormat for ParquetFormat { ) -> Result> { let mut metadata_size_hint = None; + let filter_effectiveness_threshold = state + .config_options() + .execution + .parquet + .filter_effectiveness_threshold; + if let Some(metadata) = self.metadata_size_hint() { metadata_size_hint = Some(metadata); } @@ -518,6 +524,7 @@ impl FileFormat for ParquetFormat { .cloned() .ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?; source = source.with_table_parquet_options(self.options.clone()); + source = source.with_filter_pushdown_selectivity(filter_effectiveness_threshold); // Use the CachedParquetFileReaderFactory let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 2d6fb69270bf3..1496da8d50775 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -91,6 +91,8 @@ pub struct ParquetFileMetrics { /// number of rows that were stored in the cache after evaluating predicates /// reused for the output. pub predicate_cache_records: Gauge, + //// Time spent applying filters + pub filter_apply_time: Time, } impl ParquetFileMetrics { @@ -186,6 +188,10 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .gauge("predicate_cache_records", partition); + let filter_apply_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("filter_apply_time", partition); + Self { files_ranges_pruned_statistics, predicate_evaluation_errors, @@ -205,6 +211,7 @@ impl ParquetFileMetrics { scan_efficiency_ratio, predicate_cache_inner_records, predicate_cache_records, + filter_apply_time, } } } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 0e137a706fad7..00b148b634d90 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -29,6 +29,7 @@ mod page_filter; mod reader; mod row_filter; mod row_group_filter; +mod selectivity; mod sort; pub mod source; mod supported_predicates; @@ -39,7 +40,11 @@ pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; pub use reader::*; // Expose so downstream crates can use it +pub use row_filter::FilterMetrics; +pub use row_filter::RowFilterWithMetrics; pub use row_filter::build_row_filter; +pub use row_filter::build_row_filter_with_metrics; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; +pub use selectivity::SelectivityTracker; pub use writer::plan_to_parquet; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..47de3fe6c9b03 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,16 +19,19 @@ use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; +use crate::selectivity::{PartitionedFilters, SelectivityTracker}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{AsArray, RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; +use datafusion_physical_expr::split_conjunction; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; +use itertools::Itertools; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -39,12 +42,13 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, + internal_datafusion_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ - PhysicalExpr, is_dynamic_physical_expr, + PhysicalExpr, fmt_sql, is_dynamic_physical_expr, }; use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, @@ -120,6 +124,9 @@ pub(super) struct ParquetOpener { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Shared selectivity tracker for adaptive filter reordering. + /// Each opener reads stats and decides which filters to push down. + pub selectivity_tracker: Arc>, } /// Represents a prepared access plan with optional row selection @@ -280,7 +287,7 @@ impl FileOpener for ParquetOpener { let reverse_row_groups = self.reverse_row_groups; let preserve_order = self.preserve_order; - + let selectivity_tracker = Arc::clone(&self.selectivity_tracker); Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -460,27 +467,10 @@ impl FileOpener for ParquetOpener { // --------------------------------------------------------------------- // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); + // First, partition filters based on selectivity tracking + // filter_metrics will be populated if we successfully build a row filter + let mut filter_metrics: Vec = vec![]; - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; - }; if force_filter_selections { builder = builder.with_row_selection_policy(RowSelectionPolicy::Selectors); @@ -493,7 +483,6 @@ impl FileOpener for ParquetOpener { // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query let file_metadata = Arc::clone(builder.metadata()); - let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read let access_plan = @@ -505,7 +494,7 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata - if let Some(predicate) = predicate.as_ref() { + if let Some(predicate) = pruning_predicate.as_ref() { if enable_row_group_stats_pruning { row_groups.prune_by_statistics( &physical_file_schema, @@ -601,8 +590,108 @@ impl FileOpener for ParquetOpener { // metrics from the arrow reader itself let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let indices = projection.column_indices(); - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + // Acquire tracker lock once for both partitioning and row filter building. + // We hold it through the projection extension code (which doesn't need it) + // to avoid the deadlock from release-and-reacquire pattern. + let (post_scan_filters, original_projection_len, projection, mask) = { + let tracker = selectivity_tracker.read(); + + let PartitionedFilters { + row_filters, + post_scan, + } = if let Some(predicate) = + pushdown_filters.then_some(predicate.as_ref()).flatten() + { + // Split predicate into conjuncts and partition based on selectivity + let conjuncts: Vec> = + split_conjunction(predicate) + .into_iter() + .map(Arc::clone) + .collect(); + // #[cfg(debug_assertions)] + // { + // use datafusion_physical_expr_common::physical_expr::fmt_sql; + // for (expr, selectivity) in tracker.iter() { + // println!( + // "effectiveness for expr {}: {:.2}%", + // fmt_sql(expr.as_ref()), + // selectivity.effectiveness() * 100.0, + // ); + // } + // } + tracker.partition_filters(conjuncts) + } else { + PartitionedFilters { + row_filters: vec![], + post_scan: vec![], + } + }; + + // #[cfg(debug_assertions)] + // { + // println!( + // "ParquetOpener: pushing down {} filters, deferring {} filters", + // row_filters.len(), + // post_scan.len(), + // ); + // if !row_filters.is_empty() { + // println!(" Row filters:"); + // for filter in &row_filters { + // use datafusion_physical_expr_common::physical_expr::fmt_sql; + // println!(" {}", fmt_sql(filter.as_ref())); + // } + // } + // } + + // Extend projection with post-scan filter expressions BEFORE computing + // column indices, so the mask includes columns needed by filters. + let original_projection_len = projection.as_ref().len(); + let projection = if post_scan.is_empty() { + projection + } else { + let mut extended_exprs: Vec = + projection.iter().cloned().collect(); + + for (i, filter) in post_scan.iter().enumerate() { + extended_exprs.push(ProjectionExpr { + expr: Arc::clone(filter), + alias: format!("__filter_{i}"), + }); + } + + ProjectionExprs::new(extended_exprs) + }; + + // Now compute column indices (includes filter columns) + let indices = projection.column_indices(); + let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + + // Build row filter with only the high-effectiveness filters + if !row_filters.is_empty() { + let row_filter_result = row_filter::build_row_filter_with_metrics( + row_filters, + &physical_file_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + &tracker, + ); + + match row_filter_result { + Ok(Some(result)) => { + builder = builder.with_row_filter(result.row_filter); + filter_metrics = result.filter_metrics; + } + Ok(None) => {} + Err(e) => { + debug!("Ignoring error building row filter: {e}"); + } + }; + } + + (post_scan, original_projection_len, projection, mask) + }; + // tracker lock released here let stream = builder .with_projection(mask) @@ -615,6 +704,7 @@ impl FileOpener for ParquetOpener { let predicate_cache_inner_records = file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + let filter_apply_time = file_metrics.filter_apply_time.clone(); let stream_schema = Arc::clone(stream.schema()); // Check if we need to replace the schema to handle things like differing nullability or metadata. @@ -629,6 +719,19 @@ impl FileOpener for ParquetOpener { let projector = projection.make_projector(&stream_schema)?; + // Pre-compute the data schema for post-scan filtering (excludes filter columns) + let post_scan_data_schema = if !post_scan_filters.is_empty() { + let proj_schema = projector.output_schema(); + Some(Arc::new(arrow::datatypes::Schema::new( + proj_schema.fields()[..original_projection_len].to_vec(), + ))) + } else { + None + }; + + // Clone for use in the stream mapping closure + let post_scan_tracker = Arc::clone(&selectivity_tracker); + let stream = stream.map_err(DataFusionError::from).map(move |b| { b.and_then(|mut b| { copy_arrow_reader_metrics( @@ -637,6 +740,19 @@ impl FileOpener for ParquetOpener { &predicate_cache_records, ); b = projector.project_batch(&b)?; + + // Apply post-scan filters if present + if let Some(ref data_schema) = post_scan_data_schema { + let start = datafusion_common::instant::Instant::now(); + b = apply_post_scan_filters( + b, + Arc::clone(data_schema), + &post_scan_filters, + &post_scan_tracker, + )?; + filter_apply_time.add_elapsed(start); + } + if replace_schema { // Ensure the output batch has the expected schema. // This handles things like schema level and field level metadata, which may not be present @@ -664,6 +780,19 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- + + // Wrap with SelectivityUpdatingStream if we have filter metrics to track + let stream = if !filter_metrics.is_empty() { + SelectivityUpdatingStream::new( + stream, + filter_metrics, + Arc::clone(&selectivity_tracker), + ) + .boxed() + } else { + stream.boxed() + }; + if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( stream, @@ -672,12 +801,94 @@ impl FileOpener for ParquetOpener { ) .boxed()) } else { - Ok(stream.boxed()) + Ok(stream) } })) } } +/// Apply post-scan filters to a record batch. +/// +/// This function: +/// 1. Extracts the filter columns (boolean arrays) from the end of the batch +/// 2. Tracks per-filter selectivity for adaptive filter reordering +/// 3. Combines them with AND +/// 4. Applies the combined filter mask to the data columns +/// 5. Returns a new batch with only the data columns +/// +/// The selectivity tracking here provides accurate measurements because all +/// post-scan filters see the same input rows (unlike row filters which run +/// sequentially and see progressively fewer rows). +fn apply_post_scan_filters( + batch: RecordBatch, + data_schema: SchemaRef, + filter_exprs: &[Arc], + selectivity_tracker: &parking_lot::RwLock, +) -> Result { + use arrow::array::as_boolean_array; + use arrow::compute::{and, filter_record_batch}; + + // Fast path: no work to be done + if filter_exprs.is_empty() { + return Ok(batch); + } + + let (_batch_schema, columns, num_rows) = batch.into_parts(); + let num_data_cols = data_schema.fields().len(); + + // Extract data columns and filter columns + let data_columns: Vec<_> = columns[..num_data_cols].to_vec(); + let filter_columns: Vec<_> = columns[num_data_cols..].to_vec(); + + // Track per-filter selectivity before combining. + // This gives us accurate marginal selectivity since all filters see the same input. + let input_rows = num_rows as u64; + if input_rows > 0 { + let mut rows_matched = Vec::with_capacity(filter_exprs.len()); + for (expr, col) in filter_exprs.iter().zip_eq(filter_columns.iter()) { + let bool_arr = col.as_boolean_opt().ok_or_else(|| internal_datafusion_err!( + "Expected filter expression to evaluate to boolean, got {}\nFilter expression: {}", + col.data_type(), + fmt_sql(expr.as_ref()) + ))?; + rows_matched.push(bool_arr.true_count() as u64); + } + let mut tracker = selectivity_tracker.write(); + for (expr, rows_matched) in filter_exprs.iter().zip_eq(rows_matched.into_iter()) { + tracker.update(expr, rows_matched, input_rows); + } + } + + // Combine filter columns with AND (avoiding unnecessary clones) + let combined_mask = match filter_columns.len() { + 0 => None, + 1 => Some(as_boolean_array(filter_columns[0].as_ref()).clone()), + _ => { + // Start with and() of first two - creates a new array, no clone needed + let first = as_boolean_array(filter_columns[0].as_ref()); + let second = as_boolean_array(filter_columns[1].as_ref()); + let mut acc = and(first, second)?; + for col in &filter_columns[2..] { + acc = and(&acc, as_boolean_array(col.as_ref()))?; + } + Some(acc) + } + }; + + // Create batch with data columns only + let opts = RecordBatchOptions::new().with_row_count(Some(num_rows)); + let data_batch = RecordBatch::try_new_with_options(data_schema, data_columns, &opts)?; + + // Apply the filter + let filtered = if let Some(mask) = combined_mask { + filter_record_batch(&data_batch, &mask)? + } else { + data_batch + }; + + Ok(filtered) +} + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion fn copy_arrow_reader_metrics( @@ -835,6 +1046,92 @@ where } } +/// A stream wrapper that updates the [`SelectivityTracker`] after each batch. +/// +/// This captures per-filter metrics during stream processing and updates the shared +/// selectivity tracker incrementally after each batch. This allows the system to +/// learn filter effectiveness quickly, potentially promoting effective filters +/// to row filters mid-stream for subsequent files. +struct SelectivityUpdatingStream { + /// The inner stream producing record batches + inner: S, + /// Has the stream finished processing? + done: bool, + /// Per-filter metrics collected during stream processing + filter_metrics: Vec, + /// Last reported values for each filter (to compute deltas) + last_reported: Vec<(u64, u64)>, // (matched, total) per filter + /// Shared selectivity tracker to update when stream completes + selectivity_tracker: Arc>, +} + +impl SelectivityUpdatingStream { + fn new( + stream: S, + filter_metrics: Vec, + selectivity_tracker: Arc>, + ) -> Self { + let last_reported = vec![(0, 0); filter_metrics.len()]; + Self { + inner: stream, + done: false, + filter_metrics, + last_reported, + selectivity_tracker, + } + } + + /// Update the selectivity tracker with metrics accumulated since last update. + /// Uses delta tracking to avoid double-counting rows. + fn update_selectivity(&mut self) { + let mut tracker = self.selectivity_tracker.write(); + for (i, metrics) in self.filter_metrics.iter().enumerate() { + let current_matched = metrics.get_rows_matched() as u64; + let current_total = metrics.get_rows_total() as u64; + + let (last_matched, last_total) = self.last_reported[i]; + let delta_matched = current_matched - last_matched; + let delta_total = current_total - last_total; + + // Only update if we have new rows since last update + if delta_total > 0 { + tracker.update(&metrics.expr, delta_matched, delta_total); + self.last_reported[i] = (current_matched, current_total); + } + } + } +} + +impl Stream for SelectivityUpdatingStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.done { + return Poll::Ready(None); + } + + match ready!(self.inner.poll_next_unpin(cx)) { + None => { + // Stream completed - final update to selectivity tracker + self.done = true; + self.update_selectivity(); + Poll::Ready(None) + } + Some(result) => { + // Update selectivity after each batch for faster learning + self.update_selectivity(); + Poll::Ready(Some(result)) + } + } + } +} + #[derive(Default)] struct EncryptionContext { #[cfg(feature = "parquet_encryption")] @@ -1017,7 +1314,10 @@ mod test { use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; + use crate::{ + DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener, + selectivity::SelectivityTracker, + }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ @@ -1198,6 +1498,9 @@ mod test { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, + selectivity_tracker: Arc::new(parking_lot::RwLock::new( + SelectivityTracker::default(), + )), } } } @@ -1626,7 +1929,10 @@ mod test { assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); - // Filter should not match the partition value or the data value + // Filter should not match the partition value or the data value. + // With adaptive selectivity tracking, unknown filters are pushed down + // as row filters initially. The row filter prunes all rows during decoding, + // resulting in no batches being returned. let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2924208c5bd99..efd888f2921fa 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -87,8 +87,49 @@ use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; use datafusion_physical_plan::metrics; use super::ParquetFileMetrics; +use super::selectivity::SelectivityTracker; use super::supported_predicates::supports_list_predicates; +/// Metrics for a single filter predicate, paired with the original expression. +/// +/// These metrics are tracked during row filter evaluation and can be used +/// to update selectivity statistics after processing completes. +#[derive(Debug, Clone)] +pub struct FilterMetrics { + /// The original filter expression (before any rewriting for the file schema) + pub expr: Arc, + /// Counter for rows that matched (passed) this filter + rows_matched: metrics::Count, + /// Counter for rows that were pruned (filtered out) by this filter + rows_pruned: metrics::Count, +} + +impl FilterMetrics { + /// Get the number of rows that matched this filter + pub fn get_rows_matched(&self) -> usize { + self.rows_matched.value() + } + + /// Get the number of rows that were pruned by this filter + pub fn get_rows_pruned(&self) -> usize { + self.rows_pruned.value() + } + + /// Get the total number of rows evaluated by this filter + pub fn get_rows_total(&self) -> usize { + self.get_rows_matched() + self.get_rows_pruned() + } +} + +/// Result of building a row filter, containing both the filter and per-expression metrics. +pub struct RowFilterWithMetrics { + /// The row filter to apply during parquet decoding + pub row_filter: RowFilter, + /// Metrics for each filter expression, in the order they appear in the row filter. + /// These can be read after the stream completes to update selectivity statistics. + pub filter_metrics: Vec, +} + /// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform /// row-level filtering during parquet decoding. /// @@ -566,6 +607,9 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; @@ -612,9 +657,21 @@ pub fn build_row_filter( if reorder_predicates { candidates.sort_unstable_by(|c1, c2| { - match c1.can_use_index.cmp(&c2.can_use_index) { - Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes), - ord => ord, + let eff1 = selectivity_tracker.get_effectiveness(&c1.expr); + let eff2 = selectivity_tracker.get_effectiveness(&c2.expr); + + match (eff1, eff2) { + // Both have known effectiveness: sort by effectiveness descending + // (higher effectiveness = more selective = should come first) + (Some(e1), Some(e2)) => e2.partial_cmp(&e1).unwrap_or(Ordering::Equal), + // Known effectiveness comes before unknown + (Some(_), None) => Ordering::Less, + (None, Some(_)) => Ordering::Greater, + // Both unknown: fall back to existing heuristics + (None, None) => match c1.can_use_index.cmp(&c2.can_use_index) { + Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes), + ord => ord, + }, } }); } @@ -654,6 +711,211 @@ pub fn build_row_filter( .map(|filters| Some(RowFilter::new(filters))) } +/// Build a [`RowFilter`] from the given predicate expression, returning per-expression metrics. +/// +/// This is similar to [`build_row_filter`] but additionally returns [`FilterMetrics`] for each +/// filter expression. The metrics can be read after stream processing completes to update +/// selectivity statistics. +/// +/// # Arguments +/// * `expr` - The filter predicate, already adapted to reference columns in `file_schema` +/// * `file_schema` - The Arrow schema of the parquet file +/// * `metadata` - Parquet file metadata used for cost estimation +/// * `reorder_predicates` - If true, reorder predicates to minimize I/O +/// * `file_metrics` - Metrics for tracking filter performance +/// * `selectivity_tracker` - Tracker containing effectiveness data for filter reordering +/// +/// # Returns +/// * `Ok(Some(result))` containing the row filter and per-expression metrics +/// * `Ok(None)` if no expressions can be used as a RowFilter +/// * `Err(e)` if an error occurs while building the filter +pub fn build_row_filter_with_metrics( + predicates: Vec>, + file_schema: &SchemaRef, + metadata: &ParquetMetaData, + reorder_predicates: bool, + file_metrics: &ParquetFileMetrics, + selectivity_tracker: &SelectivityTracker, +) -> Result> { + let rows_pruned = &file_metrics.pushdown_rows_pruned; + let rows_matched = &file_metrics.pushdown_rows_matched; + let time = &file_metrics.row_pushdown_eval_time; + + // Determine which conjuncts can be evaluated as ArrowPredicates, if any + // We need to preserve the original expressions before building candidates + let mut candidates_with_exprs: Vec<(Arc, FilterCandidate)> = + predicates + .into_iter() + .filter_map(|expr| { + let original_expr = Arc::clone(&expr); + FilterCandidateBuilder::new(expr, Arc::clone(file_schema)) + .build(metadata) + .ok() + .flatten() + .map(|candidate| (original_expr, candidate)) + }) + .collect(); + + // no candidates + if candidates_with_exprs.is_empty() { + return Ok(None); + } + + if reorder_predicates { + candidates_with_exprs.sort_unstable_by(|(_, c1), (_, c2)| { + let eff1 = selectivity_tracker.get_effectiveness(&c1.expr); + let eff2 = selectivity_tracker.get_effectiveness(&c2.expr); + + match (eff1, eff2) { + // Both have known effectiveness: sort by effectiveness descending + // (higher effectiveness = more selective = should come first) + (Some(e1), Some(e2)) => e2.partial_cmp(&e1).unwrap_or(Ordering::Equal), + // Either unknown: fall back to existing heuristics + _ => match c1.can_use_index.cmp(&c2.can_use_index) { + Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes), + ord => ord, + }, + } + }); + } + + let total_candidates = candidates_with_exprs.len(); + let mut filter_metrics = Vec::with_capacity(total_candidates); + let mut arrow_predicates = Vec::with_capacity(total_candidates); + + for (idx, (original_expr, candidate)) in candidates_with_exprs.into_iter().enumerate() + { + let is_last = idx == total_candidates - 1; + + // Create per-predicate metrics for selectivity tracking + let predicate_rows_matched = metrics::Count::new(); + let predicate_rows_pruned = metrics::Count::new(); + + // Store references to the metrics for the filter + filter_metrics.push(FilterMetrics { + expr: original_expr, + rows_matched: predicate_rows_matched.clone(), + rows_pruned: predicate_rows_pruned.clone(), + }); + + // For global metrics tracking: + // - All predicates contribute to the global pruned counter + // - Only the last predicate contributes to the global matched counter + let global_rows_pruned = rows_pruned.clone(); + let global_rows_matched = if is_last { + rows_matched.clone() + } else { + metrics::Count::new() + }; + + // Create a predicate that updates both per-predicate and global metrics + let arrow_pred = DatafusionArrowPredicateWithMetrics::try_new( + candidate, + metadata, + predicate_rows_pruned, + predicate_rows_matched, + global_rows_pruned, + global_rows_matched, + time.clone(), + )?; + + arrow_predicates.push(Box::new(arrow_pred) as Box); + } + + Ok(Some(RowFilterWithMetrics { + row_filter: RowFilter::new(arrow_predicates), + filter_metrics, + })) +} + +/// A variant of [`DatafusionArrowPredicate`] that tracks both per-predicate and global metrics. +/// +/// This is used by [`build_row_filter_with_metrics`] to enable selectivity tracking +/// while maintaining backward compatibility with the global metrics system. +#[derive(Debug)] +struct DatafusionArrowPredicateWithMetrics { + /// the filter expression + physical_expr: Arc, + /// Path to the columns in the parquet schema required to evaluate the expression + projection_mask: ProjectionMask, + /// Per-predicate: how many rows were filtered out by this predicate + local_rows_pruned: metrics::Count, + /// Per-predicate: how many rows passed this predicate + local_rows_matched: metrics::Count, + /// Global: how many rows were filtered out (shared across predicates) + global_rows_pruned: metrics::Count, + /// Global: how many rows passed (only tracked by last predicate) + global_rows_matched: metrics::Count, + /// how long was spent evaluating this predicate + time: metrics::Time, +} + +impl DatafusionArrowPredicateWithMetrics { + fn try_new( + candidate: FilterCandidate, + metadata: &ParquetMetaData, + local_rows_pruned: metrics::Count, + local_rows_matched: metrics::Count, + global_rows_pruned: metrics::Count, + global_rows_matched: metrics::Count, + time: metrics::Time, + ) -> Result { + let physical_expr = + reassign_expr_columns(candidate.expr, &candidate.filter_schema)?; + + Ok(Self { + physical_expr, + // Use leaf indices: when nested columns are involved, we must specify + // leaf (primitive) column indices in the Parquet schema so the decoder + // can properly project and filter nested structures. + projection_mask: ProjectionMask::leaves( + metadata.file_metadata().schema_descr(), + candidate.projection.leaf_indices.iter().copied(), + ), + local_rows_pruned, + local_rows_matched, + global_rows_pruned, + global_rows_matched, + time, + }) + } +} + +impl ArrowPredicate for DatafusionArrowPredicateWithMetrics { + fn projection(&self) -> &ProjectionMask { + &self.projection_mask + } + + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + let mut timer = self.time.timer(); + + self.physical_expr + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + let bool_arr = as_boolean_array(&array)?.clone(); + let num_matched = bool_arr.true_count(); + let num_pruned = bool_arr.len() - num_matched; + + // Update per-predicate metrics (for selectivity tracking) + self.local_rows_pruned.add(num_pruned); + self.local_rows_matched.add(num_matched); + + // Update global metrics (for backward compatibility) + self.global_rows_pruned.add(num_pruned); + self.global_rows_matched.add(num_matched); + + timer.stop(); + Ok(bool_arr) + }) + .map_err(|e| { + ArrowError::ComputeError(format!( + "Error evaluating filter predicate: {e:?}" + )) + }) + } +} + #[cfg(test)] mod test { use super::*; @@ -936,11 +1198,18 @@ mod test { let metrics = ExecutionPlanMetricsSet::new(); let file_metrics = ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); + let selectivity_tracker = SelectivityTracker::default(); - let row_filter = - build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) - .expect("building row filter") - .expect("row filter should exist"); + let row_filter = build_row_filter( + &expr, + &file_schema, + &metadata, + false, + &file_metrics, + &selectivity_tracker, + ) + .expect("building row filter") + .expect("row filter should exist"); let reader = parquet_reader_builder .with_row_filter(row_filter) diff --git a/datafusion/datasource-parquet/src/selectivity.rs b/datafusion/datasource-parquet/src/selectivity.rs new file mode 100644 index 0000000000000..7e9fb118d46d2 --- /dev/null +++ b/datafusion/datasource-parquet/src/selectivity.rs @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Adaptive filter selectivity tracking for Parquet row filters. +//! +//! This module provides infrastructure to track filter effectiveness across files +//! and adaptively decide which filters should be pushed down as row filters vs. +//! applied post-scan. +//! +//! The key insight is that filters with low effectiveness (those that don't filter +//! out many rows) may not be worth the I/O cost of late materialization. By tracking +//! effectiveness across files, we can learn which filters are worth pushing down. + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +/// Result of partitioning filters based on their effectiveness. +/// +/// Filters are split into two groups: +/// - `row_filters`: Filters that should be pushed down as row filters +/// - `post_scan`: Filters that should be applied after scanning +#[derive(Debug, Clone, Default)] +pub struct PartitionedFilters { + /// Filters to push down as row filters (effective or unknown effectiveness) + pub row_filters: Vec>, + /// Filters to apply post-scan (known to be ineffective) + pub post_scan: Vec>, +} + +/// Wrapper for `Arc` that uses structural Hash/Eq. +/// +/// This is needed because `Arc` uses pointer equality by default, +/// but we want to use the structural equality provided by `DynEq` and `DynHash`. +/// +/// For dynamic expressions (like `DynamicFilterPhysicalExpr`), we use the snapshot +/// of the expression to ensure stable hash/eq values even as the dynamic expression +/// updates. This is critical for HashMap correctness. +#[derive(Clone, Debug)] +pub struct ExprKey(Arc); + +impl ExprKey { + /// Create a new ExprKey from an expression. + /// + /// For dynamic expressions, this takes a snapshot to ensure stable hash/eq. + pub fn new(expr: &Arc) -> Self { + // Try to get a snapshot; if available, use it for stable hash/eq + let stable_expr = expr + .snapshot() + .ok() + .flatten() + .unwrap_or_else(|| Arc::clone(expr)); + Self(stable_expr) + } +} + +impl Hash for ExprKey { + fn hash(&self, state: &mut H) { + // dyn PhysicalExpr implements Hash, which delegates to dyn_hash + self.0.as_ref().hash(state); + } +} + +impl PartialEq for ExprKey { + fn eq(&self, other: &Self) -> bool { + self.0.as_ref() == other.0.as_ref() + } +} + +impl Eq for ExprKey {} + +/// Tracks selectivity statistics for a single filter expression. +#[derive(Debug, Clone, Default)] +pub struct SelectivityStats { + /// Number of rows that matched (passed) the filter + pub rows_matched: u64, + /// Total number of rows evaluated + pub rows_total: u64, +} + +impl SelectivityStats { + /// Create new stats with given values. + pub fn new(rows_matched: u64, rows_total: u64) -> Self { + Self { + rows_matched, + rows_total, + } + } + + /// Returns the filter effectiveness (fraction of rows filtered out). + /// + /// - 1.0 = perfect filter (all rows filtered out) + /// - 0.0 = useless filter (no rows filtered out) + /// + /// Returns 0.0 if no rows have been evaluated (unknown effectiveness). + pub fn effectiveness(&self) -> f64 { + if self.rows_total == 0 { + 0.0 // Unknown, assume ineffective + } else { + 1.0 - (self.rows_matched as f64 / self.rows_total as f64) + } + } + + /// Update stats with new observations. + pub fn update(&mut self, matched: u64, total: u64) { + self.rows_matched += matched; + self.rows_total += total; + } +} + +/// Cross-file selectivity tracker for adaptive filter ordering. +/// +/// This tracker maintains effectiveness statistics for filter expressions +/// across multiple files, allowing the system to learn which filters are +/// worth pushing down as row filters. +#[derive(Debug)] +pub struct SelectivityTracker { + /// Per-expression effectiveness statistics + stats: HashMap, + /// Minimum effectiveness threshold to keep a filter as a row filter. + /// Filters with effectiveness < threshold are demoted to post-scan. + /// Default: 0.8 (must filter out at least 80% of rows) + threshold: f64, +} + +impl Default for SelectivityTracker { + fn default() -> Self { + Self::new(0.8) + } +} + +impl SelectivityTracker { + /// Create a new tracker with the given effectiveness threshold. + /// + /// # Arguments + /// * `threshold` - Minimum effectiveness (0.0-1.0) to keep as row filter. + /// Filters with effectiveness < threshold are demoted to post-scan. + pub fn new(threshold: f64) -> Self { + Self { + stats: HashMap::new(), + threshold, + } + } + + /// Get the effectiveness threshold. + pub fn threshold(&self) -> f64 { + self.threshold + } + + /// Get the effectiveness for a filter expression, if known. + pub fn get_effectiveness(&self, expr: &Arc) -> Option { + let key = ExprKey::new(expr); + self.stats.get(&key).map(|s| s.effectiveness()) + } + + /// Partition filters into row_filters and post_scan based on effectiveness. + /// + /// Filters start as row filters (pushed down) to take advantage of late + /// materialization. As we learn their effectiveness, ineffective filters + /// (those that pass most rows) get demoted to post-scan for future files. + /// + /// - Filters with effectiveness >= threshold → row_filters (push down) + /// - Filters with effectiveness < threshold → post_scan (not worth pushing) + /// - Filters with unknown effectiveness → row_filters (try pushing first) + pub fn partition_filters( + &self, + filters: Vec>, + ) -> PartitionedFilters { + // If the selectivity is set to 0.0, all filters are promoted to row filters + // even if we have no stats on them. + if self.threshold == 0.0 { + return PartitionedFilters { + row_filters: filters, + post_scan: Vec::new(), + }; + } + + let mut row_filters = Vec::new(); + let mut post_scan = Vec::new(); + + for filter in filters { + let key = ExprKey::new(&filter); + match self.stats.get(&key) { + Some(stats) if stats.effectiveness() < self.threshold => { + // Known to be ineffective - demote to post-scan + post_scan.push(filter); + } + _ => { + // Unknown or effective - push down as row filter + row_filters.push(filter); + } + } + } + + PartitionedFilters { + row_filters, + post_scan, + } + } + + /// Update stats for a filter expression after processing a file. + pub fn update(&mut self, expr: &Arc, matched: u64, total: u64) { + let key = ExprKey::new(expr); + self.stats.entry(key).or_default().update(matched, total); + } + + /// Get the current stats for a filter expression, if any. + pub fn get_stats(&self, expr: &Arc) -> Option<&SelectivityStats> { + let key = ExprKey::new(expr); + self.stats.get(&key) + } + + /// Iterate all known selectivities. + pub fn iter( + &self, + ) -> impl Iterator, &SelectivityStats)> { + self.stats.iter().map(|(key, stats)| (&key.0, stats)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{BinaryExpr, col, lit}; + use std::sync::Arc; + + fn test_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + ]) + } + + fn make_filter(col_name: &str, value: i32) -> Arc { + let schema = test_schema(); + Arc::new(BinaryExpr::new( + col(col_name, &schema).unwrap(), + Operator::Eq, + lit(value), + )) + } + + #[test] + fn test_expr_key_equality() { + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 5); + let filter3 = make_filter("a", 10); + + let key1 = ExprKey::new(&filter1); + let key2 = ExprKey::new(&filter2); + let key3 = ExprKey::new(&filter3); + + // Same expression structure should be equal + assert_eq!(key1, key2); + // Different value should not be equal + assert_ne!(key1, key3); + } + + #[test] + fn test_expr_key_hash() { + use std::collections::hash_map::DefaultHasher; + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 5); + + let key1 = ExprKey::new(&filter1); + let key2 = ExprKey::new(&filter2); + + let mut hasher1 = DefaultHasher::new(); + let mut hasher2 = DefaultHasher::new(); + key1.hash(&mut hasher1); + key2.hash(&mut hasher2); + + assert_eq!(hasher1.finish(), hasher2.finish()); + } + + #[test] + fn test_selectivity_stats_effectiveness() { + // No data - unknown + let stats = SelectivityStats::new(0, 0); + assert_eq!(stats.effectiveness(), 0.0); + + // All rows pass - useless filter + let stats = SelectivityStats::new(100, 100); + assert_eq!(stats.effectiveness(), 0.0); + + // No rows pass - perfect filter + let stats = SelectivityStats::new(0, 100); + assert_eq!(stats.effectiveness(), 1.0); + + // 20% pass = 80% filtered = 0.8 effectiveness + let stats = SelectivityStats::new(20, 100); + assert_eq!(stats.effectiveness(), 0.8); + + // 50% pass = 50% filtered = 0.5 effectiveness + let stats = SelectivityStats::new(50, 100); + assert_eq!(stats.effectiveness(), 0.5); + } + + #[test] + fn test_selectivity_stats_update() { + let mut stats = SelectivityStats::default(); + assert_eq!(stats.rows_matched, 0); + assert_eq!(stats.rows_total, 0); + + stats.update(20, 100); + assert_eq!(stats.rows_matched, 20); + assert_eq!(stats.rows_total, 100); + + stats.update(30, 100); + assert_eq!(stats.rows_matched, 50); + assert_eq!(stats.rows_total, 200); + assert_eq!(stats.effectiveness(), 0.75); // 150/200 filtered = 0.75 + } + + #[test] + fn test_tracker_partition_unknown_filters() { + let tracker = SelectivityTracker::new(0.8); + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 10); + + // Unknown filters should go to row_filters to be tried first + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter1.clone(), filter2.clone()]); + + assert_eq!(row_filters.len(), 2); + assert_eq!(post_scan.len(), 0); + } + + #[test] + fn test_tracker_partition_effective_filters() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 10); + + // Update filter1 with high effectiveness (90% filtered) + tracker.update(&filter1, 10, 100); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter1.clone(), filter2.clone()]); + + // filter1 is effective (0.9 >= 0.8) → row_filters, filter2 is unknown → row_filters + assert_eq!(row_filters.len(), 2); + assert_eq!(post_scan.len(), 0); + + // Both filters should be in row_filters + assert!( + row_filters + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter1)) + ); + assert!( + row_filters + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter2)) + ); + } + + #[test] + fn test_tracker_partition_ineffective_filters() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter1 = make_filter("a", 5); + let filter2 = make_filter("a", 10); + + // Update filter1 with low effectiveness (50% filtered) + tracker.update(&filter1, 50, 100); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter1.clone(), filter2.clone()]); + + // filter1 is ineffective (0.5 < 0.8) → post_scan, filter2 is unknown → row_filters + assert_eq!(row_filters.len(), 1); + assert_eq!(post_scan.len(), 1); + + // The unknown filter should be in row_filters + assert!( + row_filters + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter2)) + ); + // The ineffective filter should be in post_scan + assert!( + post_scan + .iter() + .any(|f| ExprKey::new(f) == ExprKey::new(&filter1)) + ); + } + + #[test] + fn test_tracker_threshold_boundary() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter = make_filter("a", 5); + + // Exactly at threshold (80% filtered = 0.8 effectiveness) + tracker.update(&filter, 20, 100); + assert_eq!(tracker.get_effectiveness(&filter), Some(0.8)); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter.clone()]); + + // At threshold boundary, should stay as row filter (>= threshold) + assert_eq!(row_filters.len(), 1); + assert_eq!(post_scan.len(), 0); + } + + #[test] + fn test_tracker_just_below_threshold() { + let mut tracker = SelectivityTracker::new(0.8); + + let filter = make_filter("a", 5); + + // Just below threshold (79% filtered = 0.79 effectiveness) + tracker.update(&filter, 21, 100); + assert!((tracker.get_effectiveness(&filter).unwrap() - 0.79).abs() < 0.001); + + let PartitionedFilters { + row_filters, + post_scan, + } = tracker.partition_filters(vec![filter.clone()]); + + // Below threshold, should be demoted to post_scan + assert_eq!(row_filters.len(), 0); + assert_eq!(post_scan.len(), 1); + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 75d87a4cd16fc..3ae78df7ef3d9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -293,6 +293,11 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Tracks filter selectivity across files for adaptive filter reordering. + /// Shared across all openers - each opener reads stats and makes its own + /// decision about which filters to push down vs. apply post-scan. + pub(crate) selectivity_tracker: + Arc>, } impl ParquetSource { @@ -318,14 +323,30 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + selectivity_tracker: Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::default(), + )), } } + /// Set the selectivity for converting filters to pre-materialization row filters. + pub fn with_filter_pushdown_selectivity(mut self, selectivity: f64) -> Self { + self.selectivity_tracker = Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::new(selectivity), + )); + self + } + /// Set the `TableParquetOptions` for this ParquetSource. pub fn with_table_parquet_options( mut self, table_parquet_options: TableParquetOptions, ) -> Self { + // Update the selectivity tracker threshold from the config + let threshold = table_parquet_options.global.filter_effectiveness_threshold; + self.selectivity_tracker = Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::new(threshold), + )); self.table_parquet_options = table_parquet_options; self } @@ -460,6 +481,30 @@ impl ParquetSource { self.table_parquet_options.global.max_predicate_cache_size } + /// Set the minimum filter effectiveness threshold for adaptive filter pushdown. + /// + /// When `pushdown_filters` is enabled, filters that don't filter out at least + /// this fraction of rows will be demoted from row-level filters to post-scan filters. + /// This helps avoid the I/O cost of late materialization for filters that aren't + /// selective enough. Valid values are 0.0 to 1.0, where 0.8 means filters must + /// filter out at least 80% of rows to remain as row filters. Defaults to 0.8. + pub fn with_filter_effectiveness_threshold(mut self, threshold: f64) -> Self { + self.table_parquet_options + .global + .filter_effectiveness_threshold = threshold; + self.selectivity_tracker = Arc::new(parking_lot::RwLock::new( + crate::selectivity::SelectivityTracker::new(threshold), + )); + self + } + + /// Return the filter effectiveness threshold. + pub fn filter_effectiveness_threshold(&self) -> f64 { + self.table_parquet_options + .global + .filter_effectiveness_threshold + } + #[cfg(feature = "parquet_encryption")] fn get_encryption_factory_with_config( &self, @@ -568,6 +613,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + selectivity_tracker: Arc::clone(&self.selectivity_tracker), }); Ok(opener) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..630c736185467 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -603,6 +603,10 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + oneof filter_effectiveness_threshold_opt { + double filter_effectiveness_threshold = 35; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..e326627cf52ff 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1090,6 +1090,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + filter_effectiveness_threshold: value.filter_effectiveness_threshold_opt.map(|opt| match opt { + protobuf::parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(v) => v, + }).unwrap_or(0.8f64), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..1629119df1950 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5728,6 +5728,9 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.filter_effectiveness_threshold_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5893,6 +5896,13 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.filter_effectiveness_threshold_opt.as_ref() { + match v { + parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(v) => { + struct_ser.serialize_field("filterEffectivenessThreshold", v)?; + } + } + } struct_ser.end() } } @@ -5964,6 +5974,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "filter_effectiveness_threshold", + "filterEffectivenessThreshold", ]; #[allow(clippy::enum_variant_names)] @@ -6000,6 +6012,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + FilterEffectivenessThreshold, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6053,6 +6066,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "filterEffectivenessThreshold" | "filter_effectiveness_threshold" => Ok(GeneratedField::FilterEffectivenessThreshold), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6104,6 +6118,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut filter_effectiveness_threshold_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6312,6 +6327,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::FilterEffectivenessThreshold => { + if filter_effectiveness_threshold_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("filterEffectivenessThreshold")); + } + filter_effectiveness_threshold_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(x.0)); + } } } Ok(ParquetOptions { @@ -6347,6 +6368,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + filter_effectiveness_threshold_opt: filter_effectiveness_threshold_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..5dea8c0f5ff70 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -872,6 +872,10 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::FilterEffectivenessThresholdOpt", tags = "35")] + pub filter_effectiveness_threshold_opt: ::core::option::Option< + parquet_options::FilterEffectivenessThresholdOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -930,6 +934,11 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum FilterEffectivenessThresholdOpt { + #[prost(double, tag = "35")] + FilterEffectivenessThreshold(f64), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..d93b4f9e97fa7 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + filter_effectiveness_threshold_opt: Some(protobuf::parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(value.filter_effectiveness_threshold)), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..5dea8c0f5ff70 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -872,6 +872,10 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::FilterEffectivenessThresholdOpt", tags = "35")] + pub filter_effectiveness_threshold_opt: ::core::option::Option< + parquet_options::FilterEffectivenessThresholdOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -930,6 +934,11 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum FilterEffectivenessThresholdOpt { + #[prost(double, tag = "35")] + FilterEffectivenessThreshold(f64), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..fae1cd051a518 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -426,6 +426,7 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + filter_effectiveness_threshold_opt: Some(parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(global_options.global.filter_effectiveness_threshold)), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,6 +526,9 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + filter_effectiveness_threshold: proto.filter_effectiveness_threshold_opt.as_ref().map(|opt| match opt { + parquet_options::FilterEffectivenessThresholdOpt::FilterEffectivenessThreshold(threshold) => *threshold, + }).unwrap_or(0.8), } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index bfba715b91249..0ec2247bc6c9d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -851,6 +851,14 @@ impl protobuf::PhysicalPlanNode { let mut source = ParquetSource::new(table_schema).with_table_parquet_options(options); + source = source.with_filter_pushdown_selectivity( + ctx.session_config() + .options() + .execution + .parquet + .filter_effectiveness_threshold, + ); + if let Some(predicate) = predicate { source = source.with_predicate(predicate); } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad23..10d6280932f53 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -244,6 +244,7 @@ datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.filter_effectiveness_threshold 0.8 datafusion.execution.parquet.force_filter_selections false datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 @@ -381,6 +382,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.filter_effectiveness_threshold 0.8 (reading) Minimum filter effectiveness threshold for adaptive filter pushdown. Only filters that filter out at least this fraction of rows will be promoted to row filters during adaptive filter pushdown. A value of 1.0 means only filters that filter out all rows will be promoted. A value of 0.0 means all filters will be promoted. Because there can be a high I/O cost to pushing down ineffective filters, recommended values are in the range [0.8, 0.95], depending on random I/0 costs. datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..b622232a36090 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -92,6 +92,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.filter_effectiveness_threshold | 0.8 | (reading) Minimum filter effectiveness threshold for adaptive filter pushdown. Only filters that filter out at least this fraction of rows will be promoted to row filters during adaptive filter pushdown. A value of 1.0 means only filters that filter out all rows will be promoted. A value of 0.0 means all filters will be promoted. Because there can be a high I/O cost to pushing down ineffective filters, recommended values are in the range [0.8, 0.95], depending on random I/0 costs. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in rows | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From b27a9c3d21ea7cf82c9faf701d0264ab902b3bcd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 14 Feb 2026 11:41:50 -0500 Subject: [PATCH 2/2] fix --- .../benches/parquet_nested_filter_pushdown.rs | 15 +++++++++++---- docs/source/user-guide/configs.md | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs index 4a1eff561c76e..374009044d1fc 100644 --- a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs +++ b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs @@ -24,7 +24,9 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use criterion::{Criterion, Throughput, criterion_group, criterion_main}; use datafusion_common::ScalarValue; -use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter, SelectivityTracker}; +use datafusion_datasource_parquet::{ + ParquetFileMetrics, SelectivityTracker, build_row_filter, +}; use datafusion_expr::{Expr, col}; use datafusion_functions_nested::expr_fn::array_has; use datafusion_physical_expr::planner::logical2physical; @@ -115,9 +117,14 @@ fn scan_with_predicate( let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics); let builder = if pushdown { - if let Some(row_filter) = - build_row_filter(predicate, file_schema, &metadata, false, &file_metrics, &SelectivityTracker::default())? - { + if let Some(row_filter) = build_row_filter( + predicate, + file_schema, + &metadata, + false, + &file_metrics, + &SelectivityTracker::default(), + )? { builder.with_row_filter(row_filter) } else { builder diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b622232a36090..089face5bdfeb 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -92,7 +92,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | -| datafusion.execution.parquet.filter_effectiveness_threshold | 0.8 | (reading) Minimum filter effectiveness threshold for adaptive filter pushdown. Only filters that filter out at least this fraction of rows will be promoted to row filters during adaptive filter pushdown. A value of 1.0 means only filters that filter out all rows will be promoted. A value of 0.0 means all filters will be promoted. Because there can be a high I/O cost to pushing down ineffective filters, recommended values are in the range [0.8, 0.95], depending on random I/0 costs. | +| datafusion.execution.parquet.filter_effectiveness_threshold | 0.8 | (reading) Minimum filter effectiveness threshold for adaptive filter pushdown. Only filters that filter out at least this fraction of rows will be promoted to row filters during adaptive filter pushdown. A value of 1.0 means only filters that filter out all rows will be promoted. A value of 0.0 means all filters will be promoted. Because there can be a high I/O cost to pushing down ineffective filters, recommended values are in the range [0.8, 0.95], depending on random I/0 costs. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in rows | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" |