[RFC] Add lambda support and array_transform udf#18921
[RFC] Add lambda support and array_transform udf#18921gstvg wants to merge 26 commits intoapache:mainfrom
Conversation
Outdated# Traversing Expr trees with a schema that include lambdas parametersThe parameters of a lambda aren't present in the schema of the plan they belong to. During tree traversals that use a schema to check expressions datatype, nullability and metadata, there must be a way to access a schema which includes those parameters. Expr tree traversal with wrong schema┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
Option 1. Per-lambda schema with a new set of TreeNode methods: *_with_schemaOnce again, this PR adds another set of TreeNode-like methods on logical and physical expressions, that while traversing expression trees, when they find a ScalarUDF that contains a lambda on its arguments, uses ScalarUDF::lambdas_parameters to create a schema adjusted for each of its arguments, and pass it as an argument to the visiting/transforming function. impl Expr {
pub fn transform_with_schema<
F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
>(
self,
schema: &DFSchema,
f: F,
) -> Result<Transformed<Self>> {}
}Example usage: pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
let mut has_placeholder = false;
// Provide the schema as the first argument.
// Transforming closure receive an adjusted_schema as argument
self.transform_with_schema(schema, |mut expr, adjusted_schema| {
match &mut expr {
// Default to assuming the arguments are the same type
Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
// use adjusted_schema and not schema. Those expressions may contain
// columns referring to a lambda parameter, which Field would only be
// available in adjusted_schema and not in schema
rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
}
....In order to add the lambda parameters to schema, we need to take into account DFSchema properties: "Unqualified fields must be unique not only amongst themselves, but also must have a distinct name from any qualified field names" Since lambdas parameters are always unqualified, they may conflict with columns of the outer schema, even though those being qualified. To fix this conflict, we can either: 1: Replace the existing column with the lambda parameter, in the same index of the vec of fields of the schema, in order to not change the index of columns to the right of it. That's the current approach in this PR Expr tree traversal with adjusted schema, replacing conflicts +------------------------------------------------------------------+
| array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) |
| |
| a = Int32 |
| b = List(List(Int32)) |
| c = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| (b, i) -> array_transform(b, b -> b + c + i) |
| |
| a = Int32 |
| b = List(Int32) ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| array_transform(b, b -> b + c + i) |
| |
| a = Int32 |
| b = List(Int32) ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| b -> b + c + i |
| |
| a = Int32 |
| b = Int32 ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
2: Rename the shadowed column to an unique, non-conflicting name and add the lambda parameter to the end of the vec of fields of the schema. This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. It's trivial to change the PR to use this. Expr tree traversal with adjusted schema, renaming conflicts┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b = List(Int32) ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b = List(Int32) ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b_shadowed2 = List(Int32) ╷
╷ b = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
Lambdas usually are evaluated with a different number of rows than that of the outer scope, as in the example, where array_transform is executed with one row, and its lambda with two rows, one for each element of the array. The UDF implementation is responsible for adjusting the captured columns with the number of rows of its parameters with whatever logic makes sense to it. For array_transform, its to copy the value of the captured column for each element of the arrays: This adjustment is costly, so it's necessary to provide a way to the implementation to avoid adjusting uncaptured columns. It's intuitive to just remove the uncaptured columns, but note in the diagram and in the query below that it can change the index of captured columns. The "c" column has index 2 in the outer scope but ends up with index 1 in the others scopes Expr tree traversal with a schema with uncaptured columns removed┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a@0 = Int32 ╷
╷ b@1 = List(List(Int32)) ╷
╷ c@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ b@0 = List(Int32) ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ b@0 = List(Int32) ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ b@0 = Int32 ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
select a@0, b@1, c@2, array_transform(b@0, (b@0, i@2) -> array_transform(b@0, b@0 -> b@0 + c@1 + i@2)) from t;Option 1a: Nullify uncaptured columnsTo keep the indices stable, this PR won't remove uncaptured columns, as such, they are still present in the adjusted schema with their original type during tree traversals with the new _with_schema methods. However, to avoid the costly adjustment, when they are passed to the UDF in invoke_with_args, they are substituted with columns with the Null datatype. Expr execution/evaluation RecordBatch schema with uncaptured columns substituted with Null columns┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = List(Int32) ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = List(Int32) ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = Int32 ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
Option 1b TreeNode *_with_indices_mappingTo avoid keeping uncaptured columns in the schema and substituting them with null in the batch, is possible to add another set of TreeNode-like methods on physical expressions that calls the visiting/transforming function with a second parameter of type HashMap<usize, usize> mapping the indices of the current scope with the ones from the outermost scope. This requires that before calling the visiting/transforming function for a physical lambda expression, all its subtree be visited to collect all the captured columns to build the indices mapping. Inner lambdas require the process again and can't reuse the work of the outer lambda. This may be costly for lambdas with complex expressions and/or highly nested. impl PhysicalExprExt for Arc<dyn PhysicalExpr> {
pub fn transform_with_indices_mapping<
F: FnMut(Self, &HashMap<usize, usize>) -> Result<Transformed<Self>>,
>(
self,
mut f: F,
) -> Result<Transformed<Self>> {}
}Expr tree traversal with indices_mapping: "c" has index 2 in the root scope but 1 in the others┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ indices_mapping = {} ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
The code on minimize_join_filter would change to: fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
let mut used_columns = HashSet::new();
expr.apply_with_indices_mapping(|expr, indices_mapping| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
// this column may be child of a lambda, where this indice would refer to the lambda
// scoped schema, which won't include uncaptured columns from the plan input,
// and therefore may differ from the indices of the schema of the input plan
// In such cases, indices_mapping contain the mapping to the index of the input plan
// if a mapping is not found, it should be a column referring to a lambda parameter
let scoped_index = col.index();
if let Some(plan_index) = indices_mapping.get(scoped_index) {
used_columns.insert(plan_index);
}
}
Ok(TreeNodeRecursion::Continue)
})
...
}Option 2. Create a schema with all parameters from all lambdas for tree traversalsUse a secondary schema containing all parameters from all lambdas. For that, expressions must be transformed, normalizing all lambda parameters and its references, with a unique qualifier per lambda, so they can coexist without conflicts in this schema. A qualifier field would be added to the lambda expr pub struct Lambda {
pub qualifier: Option<String>,
pub params: Vec<String>,
pub body: Box<Expr>,
}Schema of the example: From my understanding of this video, this is similar to what DuckDB does on its binder, although with differences in the evaluation part. I didn't find any other resource for other open source engines with lambda support, like Clickhouse and Spark. This works well when dealing with plans nodes, where, during plan creation time or schema recomputation, we can normalize its lambdas, create the extended schema and save it as plan field, exposing it with a method like "lambda_extended_schema", although with an added cost to plan creation/schema recomputation. The lambda normalization actually requires two passes, a first to collect any existing lambda qualifier to avoid reusing them in the last, normalizing pass. How code would look like: //from
expr.transform_with_schema(plan.schema(), |node, adjusted_schema| ...)
//to
let schema = plan.lambda_extended_schema();
expr.transform(|node| ...)Another example: impl LogicalPlan {
pub fn replace_params_with_values(
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
self.transform_up_with_subqueries(|plan| {
// use plan.lambda_extended_schema() containing lambdas parameters
// instead of plan.schema() which wont
let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
let name_preserver = NamePreserver::new(&plan);
plan.map_expressions(|e| {
// if this expression is child of lambda and contain columns referring it's parameters
// the lambda_extended_schema already contain them
let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
....However, when working with functions/methods that deal directly with expressions, detached from a plan, the expression lambdas may be unnormalized, and the extended schema is unavailable. There's a few public methods/functions like that, like infer_placeholder_types for example: impl Expr {
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
let mut has_placeholder = false;
self.transform(|mut expr| ...)
...
}
} It could either: 1: Require to be only called with normalized expressions, and that the schema argument be the extended schema, or return an error otherwise, which is restrictive and put strain on users 2: Allow to be called with unnormalized expressions, visit the whole expr tree collecting the existing lambdas qualifiers to avoid to avoid duplicate qualifiers in the next step, perform a first transformation to guarantee that the expression lambdas are normalized, create the extended schema, for only then perform the second transformation to infer the placeholder types using the extended schema. While it can document that the returned expression is normalized, it's still a regular Expr which doesn't encode that property in its type. Also, without changing the method signature, it wouldn't even be possible to return the extended schema to allow it to be used again in other places without recomputation. This is costly and won't allow reuse of its costly work Normalized example: select t.a, t.b, array_transform(t.b, (lambda1.b, lambda1.i) -> array_transform(lambda1.b, lambda2.b -> lambda2.b + t.a + lambda1.i)) from t;Just like with the first option, this also sets uncaptured columns to Null, as well as unavailable/out-of-scope lambdas parameters. Expr tree batch evaluation with a single extended schema┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ t.a = Int32 ╷
╷ t.b = List(List(Int32)) ╷
╷ t.c = Int32 ╷
╷ lambda1.b = Null ╷
╷ lambda1.i = Null ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = List(Int32) ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = List(Int32) ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = Null ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
With this option, indices are always stable across the whole tree This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. Comparison between options:
Splitting this into smaller PRsIf this PR is decided to move forward, it will likely be with smaller PRs. In that case, I already planned a division shown below. It's necessary to analyze the graph, as it doesn't help with the discussion of this text, and it's included here just to show a reasonable granularity of smaller PRs I could find in case it helps decide whether to move this forward or not. Each rectangular node is a PR. Asymmetrical nodes are a collection of smaller PRs which share the same dependencies and aren't a dependency of any other PR. Green ones can be opened immediately. Gray ones contain unmet dependencies. Merged PRs will be colored with blue. Left-to-Right full-screen link Detailsgraph LR
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
Top-to-Bottom full-screen link Detailsgraph TB
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
Right-to-Left full-screen link Detailsgraph RL
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
Bottom-to-Top full-screen link Detailsgraph BT
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
|
|
Thanks a lot @gstvg, huge amount of work apparently (even if I am not enough skilled to judge). I am waiting for such functions since a very long time.. IMHO, this is a MUST HAVE in datafusion. All serious analytics engine provide this kind of functions, DuckDB implementation is very mature, Polars seems to have it as well even if I didn"t tried it yet and I really hope that datafusion will jump into this enhancement very quickly as it is again a big miss. Use cases like list_filter(list, expr), list_map(list, expr) and list_reduce(list, base, expr) will be available soon. Again, thanks a lot for your work and I cross the fingers for a quick progress. |
|
@gstvg This is super exciting! I'll review over the next couple of days. cc @SparkApplicationMaster @andygrove would be great to get your thoughts too 😃 |
|
Thanks @fbx31. This is high priority for me right now. I also hope we can finish this quickly. It's indeed a lot work, but now I believe it ended up that way because I went to fast in the wrong direction... More on the comment below. Thanks again! |
|
Thanks @shehabgamin. I analyzed the Spark implementation one last time, and realized I could have done similarly since the beginning... I've already tested it locally and pretend to push soon, the diff reduced to ~2K LOC, mostly in new, self contained code with small changes in existing code and without requiring changes in downstream code. I hope you haven't put time reviewing it yet because I believe the spark approach is much better and also easier/faster to review Basically, for planning, we lazily set the Field in the lambda column itself, instead of injecting it into a DFSchema //logical
struct LambdaColumn {
name: String,
field: Option<FieldRef>,
}And for evaluation, again, we lazily set the value of a lambda column instead of injecting it into a RecordBatch //physical
struct LambdaColumn {
name: String,
field: FieldRef,
value: Option<ColumnarValue>,
}I think I initially ruled that out because other expressions doesn't contain it's own Field, and this have been discussed before in #12604 *, and didn't went forward, which the difference that the other expressions Field's naturally exist in the plan schema, which now I believe justifies this difference. I will push soon and update the PR description, thanks again! *This is being discussed again in #18845 |
|
looks like latest substrait is not backward compatible (for a patch change) and its breaking 52.2 builds #20756 @benbellick . if you agree perhaps we should pin version of substrat to previous one in datafusion for df 53 release ? |
|
👋 Hello from the @substrait-io side of things! I am excited to get this one in! As part of trying to help, I'm working on getting a branch off of this one working with substrait. To bring in lambda suppport, I had to merge current main (which has lambda-containing I also may have some questions that come up as I am implementing things. Thanks! |
| } | ||
|
|
||
| pub fn captures(&self) -> &HashSet<usize> { | ||
| self.captures.get_or_init(|| { |
There was a problem hiding this comment.
Can having a OnceLock here lead to significant performance savings? I dont think lambda body trees are usually very large
There was a problem hiding this comment.
Yeah, I micro-optimized. I moved a modified version of this traversal to the lambda creation, and OnceLock it's not necessary anymore. But we can also remove it from creation and rerun for every batch, just let me know!
60cabc0#diff-02472981b6b2f1755e8a017d46d809d63f3605c29b26ec5124d281977de3a8dcR99-R101
|
Hey @gstvg, we would like to review this PR in more detail but it's very dense as it is. Do you think it would make sense to split it into independent parts to make reviewing easier? wdyt?
Each could build on top of the previous PR and we could keep track of each of them in the original issue/epic -- This is just a rough idea! |
|
@benbellick I actually merged with main two days ago (15/03), but only pushed today (17/03), really sorry for your wasted time. I was adding more tests and comments, and now I will explore with substrait too on a new branch. Please do ask anything that came up, I will do my best to answer them quickly @LiaCastaneda I would really like to split this. I started with a complete implementation just to make sure we wouldn't require major reworks on the future. |
|
@LiaCastaneda I pushed 6f2c92b removing: Column and outer variable capture support It removed ~1300 lines from the PR What do you think about also pushing type_coercion for a future PR? |
Makes sense, maybe keep array_transform but with basic support only? single parameter lambda (x -> x * 2), no index parameter, no column capture, so its possible to add sqllogictest coverage for those cases showing e2e coverage. I think type coercion should stay in this PR too, I feel it's more about correctness than a feature. |
This PR adds support for lambdas with column capture and the
array_transformfunction used to test the lambda implementation.Example usage:
Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code
3 new
Exprvariants are added,LambdaFunction, owing a new traitLambdaUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters. The reasoning why not usingColumninstead is later on this doc.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
LambdaUDFand thearray_transformimplementation ofLambdaUDFrelevant methods, collapsed due to their sizePhysical planning implementation is trivial:
The added
LambdaUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambdas_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.LambdaUDF
array_transform lambdas_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant LambdaUDF methods would be called and what they would return during planning and evaluation of the example
A pair LambdaUDF/LambdaUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved either during construction time, as in the sql planner, or later by theresolve_lambdas_variablesmethod onExprandLogicalPlan,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
For reference, Spark and Substrait also use a specialized node instead of a regular column
There's also discussions on making every expr own it's type: #18845, #12604
Possible fixes discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given LambdaFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from LambdaUDF::lambdas_parameters
How it would look like: