Implement FFI_PhysicalExpr and the structs it needs to support it.#18916
Implement FFI_PhysicalExpr and the structs it needs to support it.#18916timsaucer merged 9 commits intoapache:mainfrom
Conversation
| datafusion-expr = { workspace = true } | ||
| datafusion-functions-aggregate-common = { workspace = true } | ||
| datafusion-physical-expr = { workspace = true } | ||
| datafusion-physical-expr-common = { workspace = true } |
There was a problem hiding this comment.
In this and the following PRs I am introducing more of these crates, even when they are re-exported in datafusion core crate so that it will have a smaller PR when we remove the core crate at the end of this work epic.
|
@renato2099 @comphead @paleolimbot @kevinjqliu Would any of you be available to review this PR? I know it is large - I think it's the biggest single PR of the entire FFI update effort. I am particularly interested if anyone disagrees that this is a correct approach to take - implementing the I appreciate any time and thoughts you can spare. |
There was a problem hiding this comment.
Pull request overview
This PR introduces FFI (Foreign Function Interface) support for PhysicalExpr and its supporting structures, enabling physical expressions to be safely passed across FFI boundaries. This is a foundational change that will eventually allow removal of protobuf usage for physical expressions and improve UDF performance by preserving ColumnarValue::Scalar.
Key changes:
- Implementation of
FFI_PhysicalExprtrait with comprehensive wrapper functions for allPhysicalExprmethods - FFI-stable structures for intervals, distributions, expression properties, columnar values, partitioning, and sort expressions
- Bidirectional conversion support between native DataFusion types and FFI-safe representations
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/ffi/src/util.rs | Added FFIResult type alias for FFI return types |
| datafusion/ffi/src/record_batch_stream.rs | Made conversion utilities public for use in physical expression FFI |
| datafusion/ffi/src/physical_expr/sort.rs | Implemented FFI wrapper for physical sort expressions |
| datafusion/ffi/src/physical_expr/partitioning.rs | Implemented FFI wrapper for partitioning schemes |
| datafusion/ffi/src/physical_expr/mod.rs | Core implementation of FFI_PhysicalExpr with wrapper functions and conversions |
| datafusion/ffi/src/lib.rs | Exported new expr and physical_expr modules |
| datafusion/ffi/src/expr/util.rs | Utility functions for scalar value serialization via protobuf |
| datafusion/ffi/src/expr/mod.rs | Module organization for expression-related FFI types |
| datafusion/ffi/src/expr/interval.rs | FFI wrapper for interval arithmetic |
| datafusion/ffi/src/expr/expr_properties.rs | FFI wrappers for expression properties and sort options |
| datafusion/ffi/src/expr/distribution.rs | FFI wrappers for all distribution types |
| datafusion/ffi/src/expr/columnar_value.rs | FFI wrapper for columnar values (arrays and scalars) |
| datafusion/ffi/Cargo.toml | Added dependencies for expression and physical expression crates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Thanks @timsaucer for the PR I'm planning to look it at it today or tomorrow! |
paleolimbot
left a comment
There was a problem hiding this comment.
Awesome!
All my comments here likely lack some context, so any comments here are more like questions 🙂 .
The only high-level question I have is whether passing filters across an FFI boundary in this way are going to be useful everywhere. For example, our filtering makes extensive use of downcasting to prune row groups out of GeoParquet files and push bounding boxes into spatial indexes: https://github.com/apache/sedona-db/blob/main/rust/sedona-expr/src/spatial_filter.rs#L176-L211 . (That shouldn't block anything here...we may be totally unique and are fully capable of inventing our own FFI if we need to!)
I also wonder if mismatched struct definitions should be considered at some level here (or likely a battle for a different day given there's no precedent for that yet!)
| ColumnarValue::Scalar(v) => { | ||
| FFI_ColumnarValue::Scalar(scalar_value_to_rvec_u8(&v)?) | ||
| } |
There was a problem hiding this comment.
I left a note in a later section as well, but I think an FFI_ArrowArray of length one is a more flexible choice for passing a scalar across the FFI boundary.
There was a problem hiding this comment.
I am not sure what you have in mind @paleolimbot , but I guess one advantage would be that we'd be standardizing both into Arrow arrays which would then simplify everything else (like handling serialization would be the same), right?
what I am not sure about is if it'd be a also an advantage for datafusion as well
There was a problem hiding this comment.
This is the same trick we use in datafusion-python for passing scalar values around. I hadn't thought to apply it here, but I think it's a great idea.
| fn try_from(value: &FFI_Distribution) -> Result<Self, Self::Error> { | ||
| match value { | ||
| FFI_Distribution::Uniform(d) => d.try_into(), |
There was a problem hiding this comment.
It's unclear to me what would happen on version mismatch at this point. Does derive(StableAbi) have a mechanism for loading an FFI enum with an unrecognized variant? I feel as though _ => { / * error communicating that there is a version mismatch */ } would be a nice failure route (can/should the FFI enums be non-exhaustive?)
There was a problem hiding this comment.
I'm not sure what would happen on version mismatch here, but more generally version mismatch between stable definitions is very likely going to fail in other places as well. I've been trying to think of the best way to do checks for compatible versions and that's the reason we have crate::version() but I don't have a great way to use it at this point. At rerun we have some code that looks for known compatible versions, but it's not great: https://github.com/rerun-io/rerun/blob/main/rerun_py/rerun_sdk/rerun/catalog/_catalog_client.py#L28
| #[cfg(test)] | ||
| mod tests { | ||
| use std::hash::{DefaultHasher, Hash, Hasher}; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::array::{record_batch, BooleanArray, RecordBatch}; | ||
| use datafusion_common::{tree_node::DynTreeNode, DataFusionError, ScalarValue}; | ||
| use datafusion_expr::{interval_arithmetic::Interval, statistics::Distribution}; | ||
| use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr}; | ||
| use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExpr}; | ||
|
|
||
| use crate::physical_expr::FFI_PhysicalExpr; | ||
|
|
||
| fn create_test_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) { |
There was a problem hiding this comment.
Perhaps there is another spot for integration tests that is more appropriate, but I wonder if you could use a physical optimizer rule to replace all (or some) expressions in a planned DataFrame/SQL query and check if it executes.
| hasher.finish() | ||
| } | ||
|
|
||
| unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) { |
There was a problem hiding this comment.
any chance double free can happen if this wrapper called concurrenlty?
There was a problem hiding this comment.
I think that would be very hard to make happen. The only way I could see that is if someone tried writing a C interface instead of using the rust approach we have OR they re-implemented the foreign side. The release only happens when the FFI struct is dropped, and the rust ownership model prevents that from happening in two places for the same struct.
|
about testing, should we consider adding ABI compatibility tests (version + layout evolution)? that would help us protecting ourselves from accidental ABI-drift. We could use abi_stable::abi_stability::get_type_layout + check_layout_compatibility |
| ColumnarValue::Scalar(v) => { | ||
| FFI_ColumnarValue::Scalar(scalar_value_to_rvec_u8(&v)?) | ||
| } |
There was a problem hiding this comment.
I am not sure what you have in mind @paleolimbot , but I guess one advantage would be that we'd be standardizing both into Arrow arrays which would then simplify everything else (like handling serialization would be the same), right?
what I am not sure about is if it'd be a also an advantage for datafusion as well
|
|
||
| for original in [ | ||
| ColumnarValue::Array(array), | ||
| ColumnarValue::Scalar(ScalarValue::Int32(Some(1))), |
There was a problem hiding this comment.
maybe we could also consider a null scalar since that is a bit of an edge case
let original = ScalarValue::Int32(None);
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
comphead
left a comment
There was a problem hiding this comment.
Thanks @timsaucer and @paleolimbot @renato2099 for the review!
Which issue does this PR close?
Addresses part of #18671 but does not close it.
Rationale for this change
This PR exposes the
PhysicalExprtrait via FFI. This will allow us to remove using protobuf for transferring physical expressions across the FFI boundary. We will still use protobuf for the logical side.The reason this is important is because it will allow us to eventually remove the
corecrate as described in #18671 but also it will enable keepingColumnarValue::Scalarwhen using UDFs. This is important for UDF performance.Of all of the PRs I have prepared for #18671 this is the largest of the individual PRs. That is because it requires quite a few supporting structures from
datafusion-exprin order to support it.What changes are included in this PR?
FFI_PhysicalExprtrait and a variety of enums and structs that are needed to be FFI stable in order to implement it. It does not replace the existing usage in the UDFs and other places with theFFI_PhysicalExpryet. That comes in a later PR in order to keep the size of the individual requests to manageable.FFIResult<T>which is an alias forRResult<T, RString>DataFusionErrorAre these changes tested?
Unit tests are included.

Are there any user-facing changes?
Since this is pure addition, no user facing changes in this PR.