Skip to content

[RFC] Add lambda support and array_transform udf#18921

Draft
gstvg wants to merge 26 commits intoapache:mainfrom
gstvg:lambda4
Draft

[RFC] Add lambda support and array_transform udf#18921
gstvg wants to merge 26 commits intoapache:mainfrom
gstvg:lambda4

Conversation

@gstvg
Copy link
Contributor

@gstvg gstvg commented Nov 25, 2025

This PR adds support for lambdas with column capture and the array_transform function used to test the lambda implementation.

Example usage:

CREATE TABLE t as SELECT 2 as n;

SELECT array_transform([2, 3], v -> v != t.n) from t;

[false, true]

-- arbitrally nested lambdas are also supported
SELECT array_transform([[[2, 3]]], m -> array_transform(m, l -> array_transform(l, v -> v*2)));

[[[4, 6]]]

Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code

3 new Expr variants are added, LambdaFunction, owing a new trait LambdaUDF, which is like a ScalarFunction/ScalarUDFImpl with support for lambdas, Lambda, for the lambda body and it's parameters names, and LambdaVariable, which is like Column but for lambdas parameters. The reasoning why not using Column instead is later on this doc.

Their logical representations:

enum Expr {
    LambdaFunction(LambdaFunction), // array_transform([2, 3], v -> v != t.n)
    Lambda(Lambda), // v -> v != t.n
    LambdaVariable(LambdaVariable), // v, of the lambda body: v != t.n
   ...
}

// array_transform([2, 3], v -> v != t.n)
struct LambdaFunction {
    pub func: Arc<dyn LambdaUDF>, // global instance of array_transform
    pub args: Vec<Expr>, // [Expr::ScalarValue([2, 3]), Expr::Lambda(v -> v != n)]
}

// v -> v != t.n
struct Lambda {
    pub params: Vec<String>, // ["v"]
    pub body: Box<Expr>, // v != n
}

// v, of the lambda body: v != t.n
struct LambdaVariable {
    pub name: String, // "v"
    pub field: FieldRef, // Field::new("", DataType::Int32, false)
    pub spans: Spans,
}

The example would be planned into a tree like this:

LambdaFunctionExpression
  name: array_transform
  children:
    1. ListExpression [2,3]
    2. LambdaExpression
         parameters: ["v"]
         body:
            ComparisonExpression (!=)
              left:
                 LambdaVariableExpression("v", Some(Field::new("", Int32, false)))
              right:
                 ColumnExpression("t.n")

The physical counterparts definition:

struct LambdaFunctionExpr {
    fun: Arc<dyn LambdaUDF>, // global instance of array_transform
    name: String, // "array_transform"
    args: Vec<Arc<dyn PhysicalExpr>>, // [LiteralExpr([2, 3], LambdaExpr("v -> v != t.n"))]
    return_field: FieldRef, // Field::new("", DataType::new_list(DataType::Boolean, false), false)
    config_options: Arc<ConfigOptions>, 
}


struct LambdaExpr {
    params: Vec<String>, // ["v"]
    body: Arc<dyn PhysicalExpr>, // v -> v != t.n
}

struct LambdaVariable {
    name: String, // "v", of the lambda body: v != t.n
    field: FieldRef, // Field::new("", DataType::Int32, false)
}

Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of LambdaUDF and the array_transform implementation of LambdaUDF relevant methods, collapsed due to their size

Physical planning implementation is trivial:
fn create_physical_expr(
    e: &Expr,
    input_dfschema: &DFSchema,
    execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
    let input_schema = input_dfschema.as_arrow();

    match e {
        ...
        Expr::LambdaFunction(LambdaFunction { func, args}) => {
            let physical_args =
                create_physical_exprs(args, input_dfschema, execution_props)?;

            Ok(Arc::new(LambdaFunctionExpr::try_new(
                Arc::clone(func),
                physical_args,
                input_schema,
                config_options: ... // irrelevant
            )?))
        }
        Expr::Lambda(Lambda { params, body }) => Ok(Arc::new(LambdaExpr::new(
            params.clone(),
            create_physical_expr(body, input_dfschema, execution_props)?,
        ))),
        Expr::LambdaVariable(LambdaVariable {
            name,
            field,
            spans: _,
        }) => lambda_variable(
            name,
            Arc::clone(field),
        ),
    }
}

The added LambdaUDF trait is almost a clone of ScalarUDFImpl, with the exception of:

  1. return_field_from_args and invoke_with_args, where now args.args is a list of enums with two variants: Value or Lambda instead of a list of values
  2. the addition of lambdas_parameters, which return a Field for each parameter supported for every lambda argument based on the Field of the non lambda arguments
  3. the removal of return_field and the deprecated ones is_nullable and display_name.
  4. Not yet includes analogues to the methods preimage, placement, evaluate_bounds, propagate_constraints, output_ordering and preserves_lex_ordering
LambdaUDF
trait LambdaUDF {
    /// Returns a list of the same size as args where each value is the logic below applied to value at the correspondent position in args:
    ///
    /// If it's a value, return None
    /// If it's a lambda, return the list of all parameters that that lambda supports
    ///
    /// Example for array_transform:
    ///
    /// `array_transform([2.0, 8.0], v -> v > 4.0)`
    ///
    /// ```ignore
    /// let lambdas_parameters = array_transform.lambdas_parameters(&[
    ///      ValueOrLambdaParameter::Value(Field::new("", DataType::new_list(DataType::Float32, false)))]), // the Field of the literal `[2, 8]`
    ///      ValueOrLambdaParameter::Lambda, // A lambda
    /// ]?;
    ///
    /// assert_eq!(
    ///      lambdas_parameters,
    ///      vec![
    ///         // it's a value, return None
    ///         None,
    ///         // it's a lambda, return it's supported parameters, regardless of how many are actually used
    ///         Some(vec![
    ///             // the value being transformed
    ///             Field::new("", DataType::Float32, false),
    ///             // the 1-based index being transformed, not used on the example above,
    ///             //but implementations doesn't need to care about it
    ///             Field::new("", DataType::Int32, false),
    ///         ])
    ///      ]
    /// )
    /// ```
    ///
    /// The implementation can assume that some other part of the code has coerced
    /// the actual argument types to match [`Self::signature`].
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambda<FieldRef, ()>],
    ) -> Result<Vec<Option<Vec<Field>>>>;
    fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<FieldRef>;
    fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue>;
   // ... omitted methods that are similar in ScalarUDFImpl
}

/// An argument to a lambda function
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ValueOrLambda<V, L> {
    /// A value with associated data
    Value(V),
    /// A lambda with associated data
    Lambda(L),
}

/// Information about arguments passed to the function
///
/// This structure contains metadata about how the function was called
/// such as the type of the arguments, any scalar arguments and if the
/// arguments can (ever) be null
///
/// See [`LambdaUDF::return_field_from_args`] for more information
#[derive(Clone, Debug)]
pub struct LambdaReturnFieldArgs<'a> {
    /// The data types of the arguments to the function
    ///
    /// If argument `i` to the function is a lambda, it will be the field of the result of the
    /// lambda if evaluated with the parameters returned from [`LambdaUDF::lambdas_parameters`]
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[
    ///     ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)),
    ///     ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false))
    /// ]`
    pub arg_fields: &'a [ValueOrLambda<FieldRef, FieldRef>],
    /// Is argument `i` to the function a scalar (constant)?
    ///
    /// If the argument `i` is not a scalar, it will be None
    ///
    /// For example, if a function is called like `array_transform([1], v -> v == 5)`
    /// this field will be `[Some(ScalarValue::List(...), None]`
    pub scalar_arguments: &'a [Option<&'a ScalarValue>],
}

/// Arguments passed to [`LambdaUDF::invoke_with_args`] when invoking a
/// lambda function.
#[derive(Debug, Clone)]
pub struct LambdaFunctionArgs {
    /// The evaluated arguments and lambdas to the function
    pub args: Vec<ValueOrLambda<ColumnarValue, LambdaArgument>>,
    /// Field associated with each arg, if it exists
    /// For lambdas, it will be the field of the result of
    /// the lambda if evaluated with the parameters
    /// returned from [`LambdaUDF::lambdas_parameters`]
    pub arg_fields: Vec<ValueOrLambda<FieldRef, FieldRef>>,
    /// The number of rows in record batch being evaluated
    pub number_rows: usize,
    /// The return field of the lambda function returned
    /// (from `return_field_from_args`) when creating the
    /// physical expression from the logical expression
    pub return_field: FieldRef,
    /// The config options at execution time
    pub config_options: Arc<ConfigOptions>,
}

/// A lambda argument to a LambdaFunction
#[derive(Clone, Debug)]
pub struct LambdaArgument {
    /// The parameters defined in this lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be `vec![Field::new("v", DataType::Int32, true)]`
    params: Vec<FieldRef>,
    /// The body of the lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be the physical expression of `-v`
    body: Arc<dyn PhysicalExpr>,
    /// A RecordBatch containing at least the captured columns inside this lambda body, if any
    /// Note that it may contain additional, non-specified columns, but that's a implementation detail
    ///
    /// For example, for `array_transform([2], v -> v + a + b)`,
    /// this will be a `RecordBatch` with at least two columns, `a` and `b`
    captures: Option<RecordBatch>,
}

impl LambdaArgument {
    /// Evaluate this lambda
    /// `args` should evalute to the value of each parameter
    /// of the correspondent lambda returned in [LambdaUDF::lambdas_parameters].
    ///
    /// `adjust` should adjust the captured columns of this
    /// lambda, if any, relative to it's parameters
    ///
    /// Tip: For adjusting multiple arrays by indices, use [`take_arrays`]
    ///
    /// [`take_arrays`]: arrow::compute::take_arrays
    pub fn evaluate(
        &self,
        args: &[&dyn Fn() -> Result<ArrayRef>],
        mut adjust: impl FnMut(&[ArrayRef]) -> Result<Vec<ArrayRef>>,
    ) -> Result<ColumnarValue> {
        let adjusted_captures = self
            .captures
            .as_ref()
            .map(|captures| {
                let adjusted_columns = adjust(captures.columns())?;

                RecordBatch::try_new(captures.schema(), adjusted_columns)
            })
            .transpose()?;

        let merged = merge_captures_with_variables(
            adjusted_captures.as_ref(),
            &self.params,
            args,
        )?;

        self.body.evaluate(&merged)
    }
}
array_transform lambdas_parameters implementation
fn value_lambda_pair<'a, V: Debug, L: Debug>(
    name: &str,
    args: &'a [ValueOrLambda<V, L>],
) -> Result<(&'a V, &'a L)> {
    let [value, lambda] = take_function_args(name, args)?;

    let (ValueOrLambda::Value(value), ValueOrLambda::Lambda(lambda)) = (value, lambda)
    else {
        return plan_err!(
            "{name} expects a value followed by a lambda, got {value:?} and {lambda:?}"
        );
    };

    Ok((value, lambda))
}

impl LambdaUDF for ArrayTransform {
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambda<FieldRef, ()>],
    ) -> Result<Vec<Option<Vec<Field>>>> {
        let (list, _lambda) = value_lambda_pair(self.name(), args)?;

        let (field, index_type) = match list.data_type() {
            DataType::List(field) => (field, DataType::Int32),
            DataType::LargeList(field) => (field, DataType::Int64),
            DataType::FixedSizeList(field, _) => (field, DataType::Int32),
            _ => return plan_err!("expected list, got {list}"),
        };

        // we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2),
        // nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j),
        // as datafusion will do that for us
        let value = Field::new("", field.data_type().clone(), field.is_nullable())
            .with_metadata(field.metadata().clone());
        let index = Field::new("", index_type, false);

        Ok(vec![None, Some(vec![value, index])])
    }
}
array_transform return_field_from_args implementation
impl LambdaUDF for ArrayTransform {
    fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<Arc<Field>> {
        let (list, lambda) = value_lambda_pair(self.name(), args.arg_fields)?;

        //TODO: should metadata be copied into the transformed array?

        // lambda is the resulting field of executing the lambda body
        // with the parameters returned in lambdas_parameters
        let field = Arc::new(Field::new(
            Field::LIST_FIELD_DEFAULT_NAME,
            lambda.data_type().clone(),
            lambda.is_nullable(),
        ));

        let return_type = match list.data_type() {
            DataType::List(_) => DataType::List(field),
            DataType::LargeList(_) => DataType::LargeList(field),
            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
            other => plan_err!("expected list, got {other}")?,
        };

        Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
    }
}
array_transform invoke_with_args implementation
impl LambdaUDF for ArrayTransform {
    fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue> {
        let (list, lambda) = value_lambda_pair(self.name(), &args.args)?;

        let list_array = list.to_array(args.number_rows)?;

        // as per list_values docs, if list_array is sliced, list_values will be sliced too,
        // so before constructing the transformed array below, we must adjust the list offsets with
        // adjust_offsets_for_slice
        let list_values = list_values(&list_array)?;

        // if any column got captured, we need to adjust it to the values arrays,
        // duplicating values of list with mulitple values and removing values of empty lists
        // list_values_row_number is not cheap so is important to avoid it when no column is captured
        let mut adjust_indices = None;

        // by passing closures, lambda.evaluate can evaluate only those actually needed
        let values_param = || Ok(Arc::clone(&list_values));
        let indices_param = || list_values_index(&list_array);

        // call the transforming lambda
        let transformed_values = lambda
            .evaluate(&[&values_param, &indices_param], |arrays| {
                let indices = match &adjust_indices {
                    Some(v) => v,
                    None => adjust_indices.insert(list_values_row_number(&list_array)?),
                };
                Ok(take_arrays(arrays, indices, None)?)
            })?
            .into_array(list_values.len())?;

        let field = match args.return_field.data_type() {
            DataType::List(field)
            | DataType::LargeList(field)
            | DataType::FixedSizeList(field, _) => Arc::clone(field),
            _ => {
                return exec_err!(
                    "{} expected ScalarFunctionArgs.return_field to be a list, got {}",
                    self.name(),
                    args.return_field
                );
            }
        };

        let transformed_list = match list_array.data_type() {
            DataType::List(_) => {
                let list = list_array.as_list();
                // since we called list_values above which would return sliced values for
                // a sliced list, we must adjust the offsets here as otherwise they would be invalid
                let adjusted_offsets = adjust_offsets_for_slice(list);

                Arc::new(ListArray::new(
                    field,
                    adjusted_offsets,
                    transformed_values,
                    list.nulls().cloned(),
                )) as ArrayRef
            }
            DataType::LargeList(_) => {
                let large_list = list_array.as_list();
                // since we called list_values above which would return sliced values for
                // a sliced list, we must adjust the offsets here as otherwise they would be invalid
                let adjusted_offsets = adjust_offsets_for_slice(large_list);

                Arc::new(LargeListArray::new(
                    field,
                    adjusted_offsets,
                    transformed_values,
                    large_list.nulls().cloned(),
                ))
            }
            DataType::FixedSizeList(_, value_length) => {
                Arc::new(FixedSizeListArray::new(
                    field,
                    *value_length,
                    transformed_values,
                    list_array.as_fixed_size_list().nulls().cloned(),
                ))
            }
            other => exec_err!("expected list, got {other}")?,
        };

        Ok(ColumnarValue::Array(transformed_list))
    }
}
How relevant LambdaUDF methods would be called and what they would return during planning and evaluation of the example
// this is called at sql planning
let lambdas_parameters = lambda_udf.lambdas_parameters(&[
    ValueOrLambda::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)), // the Field of the [2, 3] literal
    ValueOrLambda::Lambda(()), // A unspecified lambda. On the example, v -> v != t.n
])?;

assert_eq!(
    lambdas_parameters,
    vec![
            // the [2, 3] argument, not a lambda so no parameters
            None,
            // the parameters that *can* be declared on the lambda, and not only 
            // those actually declared: the implementation doesn't need to care 
            // about it
            Some(vec![
                Field::new("", DataType::Int32, false), // the list inner value
                Field::new("", DataType::Int32, false), // the 1-based index of the element being transformed
            ])]
);



// this is called every time ExprSchemable is called on a LambdaFunction
let return_field = array_transform.return_field_from_args(&LambdaReturnFieldArgs {
    arg_fields: &[
        ValueOrLambda::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)),
        ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)), // the return_field of the expression "v != t.n" when "v" is of the type returned in lambdas_parameters
    ],
    scalar_arguments // irrelevant
})?;

assert_eq!(return_field, Field::new("", DataType::new_list(DataType::Boolean, false), false));



let value = array_transform.evaluate(&LambdaFunctionArgs {
    args: vec![
        ValueOrLambda::Value(List([2, 3])),
        ValueOrLambda::Lambda(LambdaArgument of `v -> v != t.n`),
    ],
    arg_fields, // same as above
    number_rows: 1,
    return_field, // same as above
    config_options, // irrelevant
})?;

assert_eq!(value, BooleanArray::from([false, true]))


A pair LambdaUDF/LambdaUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045


Why LambdaVariable and not Column:

Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".

Example of code of another traversal that would break:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // if this is a lambda column, this function will break
            used_columns.insert(col.index());
        }
        Ok(TreeNodeRecursion::Continue)
    });
    ...
}

Furthermore, the implemention of ExprSchemable and PhysicalExpr::return_field for Column expects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.

By including a FieldRef on LambdaVariable that should be resolved either during construction time, as in the sql planner, or later by the resolve_lambdas_variables method on Expr and LogicalPlan, ExprSchemable and PhysicalExpr::return_field simply return it's own Field:

LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
impl ExprSchemable for Expr {
   fn to_field(
        &self,
        schema: &dyn ExprSchema,
    ) -> Result<(Option<TableReference>, Arc<Field>)> {
        let (relation, schema_name) = self.qualified_name();
        let field = match self {
           Expr::LambdaVariable(l) => Ok(Arc::clone(&l.field.ok_or_else(|| plan_err!("Unresolved LambdaVariable {}", l.name)))),
           ...
        }?;

        Ok((
            relation,
            Arc::new(field.as_ref().clone().with_name(schema_name)),
        ))
    }
    ...
}

impl PhysicalExpr for LambdaVariable {
    fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
        Ok(Arc::clone(&self.field))
    }
    ...
}

For reference, Spark and Substrait also use a specialized node instead of a regular column

There's also discussions on making every expr own it's type: #18845, #12604

Possible fixes discarded due to complexity, requiring downstream changes and implementation size:
  1. Add a new set of TreeNode methods that provides the set of lambdas parameters names seen during the traversal, so column nodes can be tested if they refer to a regular column or to a lambda parameter. Any downstream user that wants to support lambdas would need use those methods instead of the existing ones. This also would add 1k+ lines to the PR.
impl Expr {
    pub fn transform_with_lambdas_params<
        F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}

How minimize_join_filter would looks like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_lambdas_params(|expr, lambdas_params| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !lambdas_params.contains(col.name()) {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}
  1. Add a flag to the Column node indicating if it refers to a lambda parameter. Still requires checking for it on existing tree traversals that works on Columns (30+) and also downstream.
//logical
struct Column {
    pub relation: Option<TableReference>,
    pub name: String,
    pub spans: Spans,
    pub is_lambda_parameter: bool,
}

//physical
struct Column {
    name: String,
    index: usize,
    is_lambda_parameter: bool,
}

How minimize_join_filter would look like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !col.is_lambda_parameter {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}
  1. Add a new set of TreeNode methods that provides a schema that includes the lambdas parameters for the scope of the node being visited/transformed:
impl Expr {
    pub fn transform_with_schema<
        F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
    >(
        self,
        schema: &DFSchema,
        f: F,
    ) -> Result<Transformed<Self>> { ... }
    ... other methods
}

For any given LambdaFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from LambdaUDF::lambdas_parameters
How it would look like:

pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        // Provide the schema as the first argument. 
        // Transforming closure receive an adjusted_schema as argument
        self.transform_with_schema(schema, |mut expr, adjusted_schema| {
            match &mut expr {
                // Default to assuming the arguments are the same type
                Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
                    // use adjusted_schema and not schema. Those expressions may contain 
                    // columns referring to a lambda parameter, which Field would only be
                    // available in adjusted_schema and not in schema
                    rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
                    rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
                }
    ....
  1. Make available trought LogicalPlan and ExecutionPlan nodes a schema that includes all lambdas parameters from all expressions owned by the node, and use this schema for tree traversals. For nodes which won't own any expression, the regular schema can be returned
impl LogicalPlan {
    fn lambda_extended_schema(&self) -> &DFSchema;
}

trait ExecutionPlan {
    fn lambda_extended_schema(&self) -> &DFSchema;
}

//usage
impl LogicalPlan {
    pub fn replace_params_with_values(
            self,
            param_values: &ParamValues,
        ) -> Result<LogicalPlan> {
            self.transform_up_with_subqueries(|plan| {
                // use plan.lambda_extended_schema() containing lambdas parameters
                // instead of plan.schema() which wont
                let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
                let name_preserver = NamePreserver::new(&plan);
                plan.map_expressions(|e| {
                    // if this expression is child of lambda and contain columns referring it's parameters
                    // the lambda_extended_schema already contain them
                    let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
    ....

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate catalog Related to the catalog crate common Related to common crate proto Related to proto crate functions Changes to functions implementation datasource Changes to the datasource crate ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate spark labels Nov 25, 2025
@gstvg
Copy link
Contributor Author

gstvg commented Nov 25, 2025

Outdated # Traversing Expr trees with a schema that include lambdas parameters

The parameters of a lambda aren't present in the schema of the plan they belong to. During tree traversals that use a schema to check expressions datatype, nullability and metadata, there must be a way to access a schema which includes those parameters.

Expr tree traversal with wrong schema
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                         a = Int32                                ╷        
╷                         b = List(List(Int32))                    ╷        
╷                         c = Int32                                ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          a = Int32                               ╷        
╷                          b = List(List(Int32))                   ╷        
╷                          c = Int32                               ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

Option 1. Per-lambda schema with a new set of TreeNode methods: *_with_schema

Once again, this PR adds another set of TreeNode-like methods on logical and physical expressions, that while traversing expression trees, when they find a ScalarUDF that contains a lambda on its arguments, uses ScalarUDF::lambdas_parameters to create a schema adjusted for each of its arguments, and pass it as an argument to the visiting/transforming function.

impl Expr {
pub fn transform_with_schema<
        F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
    >(
        self,
        schema: &DFSchema,
        f: F,
    ) -> Result<Transformed<Self>> {}
}

Example usage:

pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        // Provide the schema as the first argument. 
        // Transforming closure receive an adjusted_schema as argument
        self.transform_with_schema(schema, |mut expr, adjusted_schema| {
            match &mut expr {
                // Default to assuming the arguments are the same type
                Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
                    // use adjusted_schema and not schema. Those expressions may contain 
                    // columns referring to a lambda parameter, which Field would only be
                    // available in adjusted_schema and not in schema
                    rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
                    rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
                }
    ....

In order to add the lambda parameters to schema, we need to take into account DFSchema properties:

"Unqualified fields must be unique not only amongst themselves, but also must have a distinct name from any qualified field names"

Since lambdas parameters are always unqualified, they may conflict with columns of the outer schema, even though those being qualified. To fix this conflict, we can either:

1: Replace the existing column with the lambda parameter, in the same index of the vec of fields of the schema, in order to not change the index of columns to the right of it. That's the current approach in this PR

Expr tree traversal with adjusted schema, replacing conflicts
 +------------------------------------------------------------------+  
 | array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(List(Int32))                    |  
 |                         c = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+  
 |             (b, i) -> array_transform(b, b -> b + c + i)         |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(Int32)  ! replaced !            |  
 |                         c = Int32                                |  
 |                         i = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+  
 |                 array_transform(b, b -> b + c + i)               |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(Int32)   ! replaced !           |  
 |                         c = Int32                                |  
 |                         i = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+ 
 |                          b -> b + c + i                          | 
 |                                                                  | 
 |                           a = Int32                              | 
 |                           b = Int32     ! replaced !             | 
 |                           c = Int32                              | 
 |                           i = Int32                              | 
 +------------------------------------------------------------------+ 
                                                                      


2: Rename the shadowed column to an unique, non-conflicting name and add the lambda parameter to the end of the vec of fields of the schema. This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. It's trivial to change the PR to use this.

Expr tree traversal with adjusted schema, renaming conflicts
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         a = Int32                                ╷        
╷                         b_shadowed1 = List(List(Int32))          ╷        
╷                         c = Int32                                ╷        
╷                         b = List(Int32)                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b_shadowed1 = List(List(Int32))           ╷        
╷                        c = Int32                                 ╷        
╷                        b = List(Int32)                           ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          a = Int32                               ╷        
╷                          b_shadowed1 = List(List(Int32))         ╷        
╷                          c = Int32                               ╷        
╷                          b_shadowed2 = List(Int32)               ╷        
╷                          b = Int32                               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        



Lambdas usually are evaluated with a different number of rows than that of the outer scope, as in the example, where array_transform is executed with one row, and its lambda with two rows, one for each element of the array. The UDF implementation is responsible for adjusting the captured columns with the number of rows of its parameters with whatever logic makes sense to it. For array_transform, its to copy the value of the captured column for each element of the arrays:
        copied once  a [1]------------------> a 1  
                                                   
     copied 2 times  b [2, 3] --------------> b 2  
                               \                   
         not copied  c []       ------------> b 3     
                                                

This adjustment is costly, so it's necessary to provide a way to the implementation to avoid adjusting uncaptured columns.

It's intuitive to just remove the uncaptured columns, but note in the diagram and in the query below that it can change the index of captured columns. The "c" column has index 2 in the outer scope but ends up with index 1 in the others scopes

Expr tree traversal with a schema with uncaptured columns removed
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a@0 = Int32                               ╷        
╷                        b@1 = List(List(Int32))                   ╷        
╷                        c@2 = Int32                               ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         b@0 = List(Int32)                        ╷        
╷                         c@1 = Int32                              ╷        
╷                         i@2 = Int32                              ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         b@0 = List(Int32)                        ╷        
╷                         c@1 = Int32                              ╷        
╷                         i@2 = Int32                              ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                          b@0 = Int32                             ╷        
╷                          c@1 = Int32                             ╷        
╷                          i@2 = Int32                             ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

select a@0, b@1, c@2, array_transform(b@0, (b@0, i@2) -> array_transform(b@0, b@0 -> b@0 + c@1 + i@2)) from t;

Option 1a: Nullify uncaptured columns

To keep the indices stable, this PR won't remove uncaptured columns, as such, they are still present in the adjusted schema with their original type during tree traversals with the new _with_schema methods. However, to avoid the costly adjustment, when they are passed to the UDF in invoke_with_args, they are substituted with columns with the Null datatype.

Expr execution/evaluation RecordBatch schema with uncaptured columns substituted with Null columns
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         a = Null  ! nullified !                  ╷        
╷                         b = List(Int32)                          ╷        
╷                         c = Int32                                ╷        
╷                         i = Int32                                ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Null  ! nullified !                   ╷        
╷                        b = List(Int32)                           ╷        
╷                        c = Int32                                 ╷        
╷                        i = Int32                                 ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                          a = Null  ! nullified !                 ╷        
╷                          b = Int32                               ╷        
╷                          c = Int32                               ╷        
╷                          i = Int32                               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

Option 1b TreeNode *_with_indices_mapping

To avoid keeping uncaptured columns in the schema and substituting them with null in the batch, is possible to add another set of TreeNode-like methods on physical expressions that calls the visiting/transforming function with a second parameter of type HashMap<usize, usize> mapping the indices of the current scope with the ones from the outermost scope. This requires that before calling the visiting/transforming function for a physical lambda expression, all its subtree be visited to collect all the captured columns to build the indices mapping. Inner lambdas require the process again and can't reuse the work of the outer lambda. This may be costly for lambdas with complex expressions and/or highly nested.

impl PhysicalExprExt for Arc<dyn PhysicalExpr> {
    pub fn transform_with_indices_mapping<
        F: FnMut(Self, &HashMap<usize, usize>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}
Expr tree traversal with indices_mapping: "c" has index 2 in the root scope but 1 in the others
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        indices_mapping = {}                      ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                     indices_mapping = { 1 => 2 }                 ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                    indices_mapping = { 1 => 2 }                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                    indices_mapping = { 1 => 2 }                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

The code on minimize_join_filter would change to:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_indices_mapping(|expr, indices_mapping| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // this column may be child of a lambda, where this indice would refer to the lambda
            // scoped schema, which won't include uncaptured columns from the plan input,
            // and therefore may differ from the indices of the schema of the input plan
            // In such cases, indices_mapping contain the mapping to the index of the input plan
            // if a mapping is not found, it should be a column referring to a lambda parameter
            let scoped_index = col.index();
            if let Some(plan_index) = indices_mapping.get(scoped_index) {
                used_columns.insert(plan_index);
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}

Option 2. Create a schema with all parameters from all lambdas for tree traversals

Use a secondary schema containing all parameters from all lambdas. For that, expressions must be transformed, normalizing all lambda parameters and its references, with a unique qualifier per lambda, so they can coexist without conflicts in this schema. A qualifier field would be added to the lambda expr

pub struct Lambda {
    pub qualifier: Option<String>,
    pub params: Vec<String>,
    pub body: Box<Expr>,
}

Schema of the example:

t.a: Int32
t.b: List(List(Int32))
lambda1.b: List(Int32)
lambda1.i: UInt32
lambda2.b: Int32

From my understanding of this video, this is similar to what DuckDB does on its binder, although with differences in the evaluation part. I didn't find any other resource for other open source engines with lambda support, like Clickhouse and Spark.

This works well when dealing with plans nodes, where, during plan creation time or schema recomputation, we can normalize its lambdas, create the extended schema and save it as plan field, exposing it with a method like "lambda_extended_schema", although with an added cost to plan creation/schema recomputation. The lambda normalization actually requires two passes, a first to collect any existing lambda qualifier to avoid reusing them in the last, normalizing pass.

How code would look like:

//from
expr.transform_with_schema(plan.schema(), |node, adjusted_schema| ...)
//to
let schema = plan.lambda_extended_schema();
expr.transform(|node| ...)

Another example:

impl LogicalPlan {
    pub fn replace_params_with_values(
            self,
            param_values: &ParamValues,
        ) -> Result<LogicalPlan> {
            self.transform_up_with_subqueries(|plan| {
                // use plan.lambda_extended_schema() containing lambdas parameters
                // instead of plan.schema() which wont
                let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
                let name_preserver = NamePreserver::new(&plan);
                plan.map_expressions(|e| {
                    // if this expression is child of lambda and contain columns referring it's parameters
                    // the lambda_extended_schema already contain them
                    let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
    ....

However, when working with functions/methods that deal directly with expressions, detached from a plan, the expression lambdas may be unnormalized, and the extended schema is unavailable. There's a few public methods/functions like that, like infer_placeholder_types for example:

impl Expr {
    pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        self.transform(|mut expr| ...)
        ...
    }
}   

It could either:

1: Require to be only called with normalized expressions, and that the schema argument be the extended schema, or return an error otherwise, which is restrictive and put strain on users

2: Allow to be called with unnormalized expressions, visit the whole expr tree collecting the existing lambdas qualifiers to avoid to avoid duplicate qualifiers in the next step, perform a first transformation to guarantee that the expression lambdas are normalized, create the extended schema, for only then perform the second transformation to infer the placeholder types using the extended schema. While it can document that the returned expression is normalized, it's still a regular Expr which doesn't encode that property in its type. Also, without changing the method signature, it wouldn't even be possible to return the extended schema to allow it to be used again in other places without recomputation. This is costly and won't allow reuse of its costly work



Normalized example:

select t.a, t.b, array_transform(t.b, (lambda1.b, lambda1.i) -> array_transform(lambda1.b, lambda2.b -> lambda2.b + t.a + lambda1.i)) from t;

Just like with the first option, this also sets uncaptured columns to Null, as well as unavailable/out-of-scope lambdas parameters.

Expr tree batch evaluation with a single extended schema
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                        t.a = Int32                               ╷        
╷                        t.b = List(List(Int32))                   ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = Null                          ╷        
╷                        lambda1.i = Null                          ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                        t.a = Null                                ╷        
╷                        t.b = Null                                ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = List(Int32)                   ╷        
╷                        lambda1.i = Int32                         ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                        t.a = Null                                ╷        
╷                        t.b = Null                                ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = List(Int32)                   ╷        
╷                        lambda1.i = Int32                         ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          t.a = Null                              ╷        
╷                          t.b = Null                              ╷        
╷                          t.c = Int32                             ╷        
╷                          lambda1.b = Null                        ╷        
╷                          lambda1.i = Int32                       ╷        
╷                          lambda2.b = Int32                       ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

With this option, indices are always stable across the whole tree

This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods.

Comparison between options:

* per-lambda schema with uncaptured columns set to null per-lambda schema with indices_mapping single extended schema
New set of TreeNode methods Yes, 1, _with_schema for both logical and physical expressions Yes, 2, _with_schema for both logical and physical expressions, and _with_indices_mapping for physical expressions No
Tree traversal added cost Only when encountering a lambda Only when encountering a lambda Zero
Plan creation/ recompute schema added cost Zero Zero Always, regardless of existence of any lambda
Code change, internal New set of TreeNode methods and using them instead of the current ones when applicable 2 new set of TreeNode methods and using them instead of the current ones when applicable Untried, unpredictable
Code change, downstream if lambda support is desired, use the new TreeNode methods instead of the current ones when applicable otherwise none if lambda support is desired, use the new TreeNode methods instead of the current ones when applicable otherwise none Variable, medium when closely associated with a Plan, just call plan. lambda_extended_schema() Unpredictable when plan is unavailable or doesn't exist
Change uncaptured columns DataType to Null Yes No Yes
Presence of unneeded Null columns in the schema during planning and optimizing and in the RecordBatch during execution as a padding/filler to keep indices stable Yes No Yes
Stable column indices across the whole expr tree Yes No Yes
Make _with_lambdas_params unnecessary for physical expressions if Expr::Column is used No Yes No

Splitting this into smaller PRs

If this PR is decided to move forward, it will likely be with smaller PRs. In that case, I already planned a division shown below. It's necessary to analyze the graph, as it doesn't help with the discussion of this text, and it's included here just to show a reasonable granularity of smaller PRs I could find in case it helps decide whether to move this forward or not.

Each rectangular node is a PR. Asymmetrical nodes are a collection of smaller PRs which share the same dependencies and aren't a dependency of any other PR. Green ones can be opened immediately. Gray ones contain unmet dependencies. Merged PRs will be colored with blue.

Left-to-Right full-screen link

Details
graph LR
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end


Loading


Top-to-Bottom full-screen link

Details
graph TB
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading


Right-to-Left full-screen link

Details
graph RL
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading


Bottom-to-Top full-screen link

Details
graph BT
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading

@gstvg gstvg changed the title [DRAFT] Add lambda support and array_transform udf [RFC] Add lambda support and array_transform udf Dec 9, 2025
@fbx31
Copy link

fbx31 commented Dec 11, 2025

Thanks a lot @gstvg, huge amount of work apparently (even if I am not enough skilled to judge). I am waiting for such functions since a very long time.. IMHO, this is a MUST HAVE in datafusion. All serious analytics engine provide this kind of functions, DuckDB implementation is very mature, Polars seems to have it as well even if I didn"t tried it yet and I really hope that datafusion will jump into this enhancement very quickly as it is again a big miss.

Use cases like list_filter(list, expr), list_map(list, expr) and list_reduce(list, base, expr) will be available soon.

Again, thanks a lot for your work and I cross the fingers for a quick progress.

@shehabgamin
Copy link
Contributor

@gstvg This is super exciting! I'll review over the next couple of days.

cc @SparkApplicationMaster @andygrove would be great to get your thoughts too 😃

@gstvg
Copy link
Contributor Author

gstvg commented Dec 14, 2025

Thanks @fbx31. This is high priority for me right now. I also hope we can finish this quickly. It's indeed a lot work, but now I believe it ended up that way because I went to fast in the wrong direction... More on the comment below. Thanks again!

@gstvg
Copy link
Contributor Author

gstvg commented Dec 14, 2025

Thanks @shehabgamin. I analyzed the Spark implementation one last time, and realized I could have done similarly since the beginning... I've already tested it locally and pretend to push soon, the diff reduced to ~2K LOC, mostly in new, self contained code with small changes in existing code and without requiring changes in downstream code. I hope you haven't put time reviewing it yet because I believe the spark approach is much better and also easier/faster to review

Basically, for planning, we lazily set the Field in the lambda column itself, instead of injecting it into a DFSchema

//logical
struct LambdaColumn {
    name: String,
    field: Option<FieldRef>,
}

And for evaluation, again, we lazily set the value of a lambda column instead of injecting it into a RecordBatch

//physical
struct LambdaColumn {
    name: String,
    field: FieldRef,
    value: Option<ColumnarValue>,
}

I think I initially ruled that out because other expressions doesn't contain it's own Field, and this have been discussed before in #12604 *, and didn't went forward, which the difference that the other expressions Field's naturally exist in the plan schema, which now I believe justifies this difference.
And most importantly, because I had no idea of how much work and downstream churn injecting the lambdas parameters into the Schema/RecordBatch would cause 🤦

I will push soon and update the PR description, thanks again!

*This is being discussed again in #18845

@github-actions github-actions bot removed core Core DataFusion crate datasource Changes to the datasource crate labels Dec 15, 2025
@milenkovicm
Copy link
Contributor

looks like latest substrait is not backward compatible (for a patch change) and its breaking 52.2 builds #20756 @benbellick . if you agree perhaps we should pin version of substrat to previous one in datafusion for df 53 release ?

@benbellick
Copy link
Contributor

👋 Hello from the @substrait-io side of things!

I am excited to get this one in! As part of trying to help, I'm working on getting a branch off of this one working with substrait. To bring in lambda suppport, I had to merge current main (which has lambda-containing substrait-rs v0.63.0), but there were a bunch of merge conflicts. I tried to resolve them the best that I (and claude) could, but if this branch is able to either rebase off of main or merge main into it, that would help a lot! Of course, I can also just update that dependency in my local branch as well.

I also may have some questions that come up as I am implementing things. Thanks!

}

pub fn captures(&self) -> &HashSet<usize> {
self.captures.get_or_init(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can having a OnceLock here lead to significant performance savings? I dont think lambda body trees are usually very large

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I micro-optimized. I moved a modified version of this traversal to the lambda creation, and OnceLock it's not necessary anymore. But we can also remove it from creation and rerun for every batch, just let me know!

60cabc0#diff-02472981b6b2f1755e8a017d46d809d63f3605c29b26ec5124d281977de3a8dcR99-R101

@LiaCastaneda
Copy link
Contributor

Hey @gstvg, we would like to review this PR in more detail but it's very dense as it is. Do you think it would make sense to split it into independent parts to make reviewing easier? wdyt?

  • Base infrastructure -> Expr::Lambda, Expr::LambdaVariable, Expr::LambdaFunction, LambdaUDF trait, physical counterparts, SQL parsing, type coercion, TreeNode integration, etc.

  • Column capture -> identifying outer column references (the captures), the captures RecordBatch, projection pushdown for lambda bodies. This is a significant feature on top of the core.

  • array_transform implementation -> the first concrete LambdaUDF, flattening, index expansion, list reconstruction?

  • Substrait support -> LambdaFunction handling in producer/consumer.

  • Rest of Lambdas.

Each could build on top of the previous PR and we could keep track of each of them in the original issue/epic -- This is just a rough idea!

@github-actions github-actions bot added ffi Changes to the ffi crate and removed spark labels Mar 18, 2026
@gstvg
Copy link
Contributor Author

gstvg commented Mar 18, 2026

@benbellick I actually merged with main two days ago (15/03), but only pushed today (17/03), really sorry for your wasted time. I was adding more tests and comments, and now I will explore with substrait too on a new branch. Please do ask anything that came up, I will do my best to answer them quickly

@LiaCastaneda I would really like to split this. I started with a complete implementation just to make sure we wouldn't require major reworks on the future.
I'm fine with removing array_transform (~300 lines), but them there would be no function to test the core lambda support with sqllogictests, is that okay? Perhaps removing the support for the second parameter, the value index, and column capture, can be a good middle ground?

@gstvg
Copy link
Contributor Author

gstvg commented Mar 18, 2026

@LiaCastaneda I pushed 6f2c92b removing:

Column and outer variable capture support
Named parameters
Expr simplifier optimizations
array transform second parameter and optimization for sliced lists
Optional LambdaUDF methods (simplify/with_updated_config)
The LambdaVariable field being optional and the tree node rewriter the resolved it

It removed ~1300 lines from the PR

What do you think about also pushing type_coercion for a future PR?
If you think that we should also remove array_transform just let me know

@LiaCastaneda
Copy link
Contributor

I'm fine with removing array_transform (~300 lines), but them there would be no function to test the core lambda support with sqllogictests, is that okay? Perhaps removing the support for the second parameter, the value index, and column capture, can be a good middle ground?

Makes sense, maybe keep array_transform but with basic support only? single parameter lambda (x -> x * 2), no index parameter, no column capture, so its possible to add sqllogictest coverage for those cases showing e2e coverage.

I think type coercion should stay in this PR too, I feel it's more about correctness than a feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

catalog Related to the catalog crate common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate execution Related to the execution crate ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.