Skip to content

Commit daa8da9

Browse files
timsaucerclaude
andcommitted
feat: import FFI physical optimizer rules; drop Python logical rules
Replace the Python-defined OptimizerRule/AnalyzerRule approach with FFI-imported physical optimizer rules. The Python logical-rule approach could observe plans but not transform them: there are no Python constructors for LogicalPlan node variants, so a rule could only return None or the input plan unchanged. The audience for custom rules also overlaps strongly with people who can write Rust. DataFusion exposes no FFI bridge for the logical OptimizerRule/AnalyzerRule traits, but it does export FFI_PhysicalOptimizerRule for the physical PhysicalOptimizerRule trait. This commit imports those instead. Changes: * Remove crates/core/src/optimizer_rules.rs, python/datafusion/optimizer.py, python/tests/test_optimizer.py, and the SessionContext.add_optimizer_rule / add_analyzer_rule methods. remove_optimizer_rule is unchanged (pre-existing). * New crates/core/src/physical_optimizer.rs reads a __datafusion_physical_optimizer_rule__ capsule and converts it via Arc<dyn PhysicalOptimizerRule>::from(&FFI_PhysicalOptimizerRule). * SessionContext gains a physical_optimizer_rules constructor argument. Upstream offers no API to add physical rules to a live context, so they are appended to the builder at construction time only. * The datafusion-ffi-example crate gains MyPhysicalOptimizerRule, a counter-backed rule used by _test_physical_optimizer_rule.py to prove the rule fires over FFI during physical planning. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1f51403 commit daa8da9

10 files changed

Lines changed: 262 additions & 488 deletions

File tree

crates/core/src/context.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -375,11 +375,12 @@ pub struct PySessionContext {
375375

376376
#[pymethods]
377377
impl PySessionContext {
378-
#[pyo3(signature = (config=None, runtime=None))]
378+
#[pyo3(signature = (config=None, runtime=None, physical_optimizer_rules=None))]
379379
#[new]
380380
pub fn new(
381381
config: Option<PySessionConfig>,
382382
runtime: Option<PyRuntimeEnvBuilder>,
383+
physical_optimizer_rules: Option<Vec<Bound<'_, PyAny>>>,
383384
) -> PyDataFusionResult<Self> {
384385
let config = if let Some(c) = config {
385386
c.config
@@ -392,11 +393,20 @@ impl PySessionContext {
392393
RuntimeEnvBuilder::default()
393394
};
394395
let runtime = Arc::new(runtime_env_builder.build()?);
395-
let session_state = SessionStateBuilder::new()
396+
let mut state_builder = SessionStateBuilder::new()
396397
.with_config(config)
397398
.with_runtime_env(runtime)
398-
.with_default_features()
399-
.build();
399+
.with_default_features();
400+
// DataFusion exposes no FFI bridge for the logical optimizer or
401+
// analyzer, so only physical optimizer rules can be supplied from
402+
// another library. They are appended after the default rules at
403+
// construction time; there is no upstream API to add them to a live
404+
// `SessionContext`.
405+
for rule in physical_optimizer_rules.unwrap_or_default() {
406+
let rule = crate::physical_optimizer::physical_optimizer_rule_from_pyobject(&rule)?;
407+
state_builder = state_builder.with_physical_optimizer_rule(rule);
408+
}
409+
let session_state = state_builder.build();
400410
let ctx = Arc::new(SessionContext::new_with_state(session_state));
401411
Ok(PySessionContext {
402412
ctx,
@@ -1145,18 +1155,6 @@ impl PySessionContext {
11451155
self.ctx.remove_optimizer_rule(name)
11461156
}
11471157

1148-
pub fn add_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> {
1149-
let adapter = crate::optimizer_rules::build_optimizer_rule(rule)?;
1150-
self.ctx.add_optimizer_rule(adapter);
1151-
Ok(())
1152-
}
1153-
1154-
pub fn add_analyzer_rule(&self, rule: Bound<'_, PyAny>) -> PyResult<()> {
1155-
let adapter = crate::optimizer_rules::build_analyzer_rule(rule)?;
1156-
self.ctx.add_analyzer_rule(adapter);
1157-
Ok(())
1158-
}
1159-
11601158
pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
11611159
let provider = wait_for_future(py, self.ctx.table_provider(name))
11621160
// Outer error: runtime/async failure

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ pub mod expr;
4545
#[allow(clippy::borrow_deref_ref)]
4646
mod functions;
4747
pub mod metrics;
48-
pub mod optimizer_rules;
4948
mod options;
49+
pub mod physical_optimizer;
5050
pub mod physical_plan;
5151
mod pyarrow_filter_expression;
5252
pub mod pyarrow_util;

crates/core/src/optimizer_rules.rs

Lines changed: 0 additions & 168 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Imports physical optimizer rules supplied by another library over FFI.
19+
//!
20+
//! DataFusion has no FFI bridge for the logical [`OptimizerRule`] /
21+
//! [`AnalyzerRule`] traits, but it does export
22+
//! [`FFI_PhysicalOptimizerRule`] for the physical
23+
//! [`PhysicalOptimizerRule`] trait. A producer crate (typically a separate
24+
//! compiled extension) exposes an object with a
25+
//! ``__datafusion_physical_optimizer_rule__`` method returning a
26+
//! :class:`PyCapsule` that wraps an [`FFI_PhysicalOptimizerRule`]. This
27+
//! module reads that capsule and converts it into an
28+
//! ``Arc<dyn PhysicalOptimizerRule>`` so it can be registered with a
29+
//! [`SessionContext`](datafusion::prelude::SessionContext) at construction
30+
//! time.
31+
//!
32+
//! [`OptimizerRule`]: datafusion::optimizer::optimizer::OptimizerRule
33+
//! [`AnalyzerRule`]: datafusion::optimizer::analyzer::AnalyzerRule
34+
35+
use std::ptr::NonNull;
36+
use std::sync::Arc;
37+
38+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
39+
use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
40+
use pyo3::prelude::*;
41+
use pyo3::types::PyCapsule;
42+
43+
use crate::errors::{PyDataFusionError, PyDataFusionResult, to_datafusion_err};
44+
45+
/// Convert a Python object exposing ``__datafusion_physical_optimizer_rule__``
46+
/// into an ``Arc<dyn PhysicalOptimizerRule>`` by reading its FFI capsule.
47+
pub(crate) fn physical_optimizer_rule_from_pyobject(
48+
obj: &Bound<'_, PyAny>,
49+
) -> PyDataFusionResult<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
50+
if !obj.hasattr("__datafusion_physical_optimizer_rule__")? {
51+
return Err(PyDataFusionError::Common(
52+
"Expected physical optimizer rule object to define \
53+
__datafusion_physical_optimizer_rule__()"
54+
.to_string(),
55+
));
56+
}
57+
58+
let capsule = obj
59+
.getattr("__datafusion_physical_optimizer_rule__")?
60+
.call0()?;
61+
let capsule = capsule.cast::<PyCapsule>().map_err(to_datafusion_err)?;
62+
let data: NonNull<FFI_PhysicalOptimizerRule> = capsule
63+
.pointer_checked(Some(c"datafusion_physical_optimizer_rule"))?
64+
.cast();
65+
let ffi_rule = unsafe { data.as_ref() };
66+
67+
Ok(Arc::<dyn PhysicalOptimizerRule + Send + Sync>::from(
68+
ffi_rule,
69+
))
70+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pyarrow as pa
21+
from datafusion import SessionContext
22+
from datafusion_ffi_example import MyPhysicalOptimizerRule
23+
24+
25+
def _setup_session_with_rule() -> tuple[SessionContext, MyPhysicalOptimizerRule]:
26+
rule = MyPhysicalOptimizerRule()
27+
ctx = SessionContext(physical_optimizer_rules=[rule])
28+
batch = pa.RecordBatch.from_arrays(
29+
[pa.array([1, 2, 3])],
30+
names=["a"],
31+
)
32+
ctx.register_record_batches("t", [[batch]])
33+
return ctx, rule
34+
35+
36+
def test_ffi_physical_optimizer_rule_runs_during_planning():
37+
"""A rule supplied via physical_optimizer_rules is invoked while the
38+
physical plan is built, and the query still returns correct results."""
39+
ctx, rule = _setup_session_with_rule()
40+
41+
before = rule.optimize_calls()
42+
result = ctx.sql("SELECT a FROM t").collect()
43+
after = rule.optimize_calls()
44+
45+
assert after > before, (
46+
f"Expected user FFI physical optimizer rule to fire, "
47+
f"before={before} after={after}"
48+
)
49+
assert result[0].column(0).to_pylist() == [1, 2, 3]
50+
51+
52+
def test_ffi_physical_optimizer_rule_export():
53+
"""The rule object exposes the FFI capsule entry point."""
54+
rule = MyPhysicalOptimizerRule()
55+
capsule = rule.__datafusion_physical_optimizer_rule__()
56+
assert capsule is not None

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogP
2222
use crate::config::MyConfig;
2323
use crate::logical_extension_codec::MyLogicalExtensionCodec;
2424
use crate::physical_extension_codec::MyPhysicalExtensionCodec;
25+
use crate::physical_optimizer::MyPhysicalOptimizerRule;
2526
use crate::scalar_udf::IsNullUDF;
2627
use crate::table_function::MyTableFunction;
2728
use crate::table_provider::MyTableProvider;
@@ -33,6 +34,7 @@ pub(crate) mod catalog_provider;
3334
pub(crate) mod config;
3435
pub(crate) mod logical_extension_codec;
3536
pub(crate) mod physical_extension_codec;
37+
pub(crate) mod physical_optimizer;
3638
pub(crate) mod scalar_udf;
3739
pub(crate) mod table_function;
3840
pub(crate) mod table_provider;
@@ -55,5 +57,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
5557
m.add_class::<MyConfig>()?;
5658
m.add_class::<MyLogicalExtensionCodec>()?;
5759
m.add_class::<MyPhysicalExtensionCodec>()?;
60+
m.add_class::<MyPhysicalOptimizerRule>()?;
5861
Ok(())
5962
}

0 commit comments

Comments
 (0)