Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dbf2aa5
add lambda support
gstvg Nov 22, 2025
fa4a8fb
add lambdas: None to existing ScalarFunctionArgs in tests/benches
gstvg Nov 22, 2025
b18d214
simplify lambda support
gstvg Dec 15, 2025
d844b2d
rename LambdaColumn to LambdaVariable
gstvg Dec 19, 2025
e1921eb
feat: add LambdaUDF
gstvg Feb 23, 2026
1f19c64
feat: remove lambda support for ScalarUDF
gstvg Feb 23, 2026
570cc53
temporarily add pr description as DOC.md
gstvg Mar 1, 2026
83dfbdd
add lambda note in substrait consumer
gstvg Mar 8, 2026
34137e1
add LambdaSignature
gstvg Mar 8, 2026
3ded115
improve lambda type coercion
gstvg Mar 8, 2026
82930ec
lambda function type coercion: stop using unstable Iterator::eq_by
gstvg Mar 9, 2026
86d5999
remove signature section from DOC.md
gstvg Mar 9, 2026
60cabc0
polish lambda impl
gstvg Mar 15, 2026
41152c3
minor improvoments
gstvg Mar 15, 2026
2be9e54
Merge branch 'main' into lambda4
gstvg Mar 15, 2026
90eb08f
improve lambdas
gstvg Mar 17, 2026
d874db7
cargo fmt
gstvg Mar 17, 2026
a59ffe8
simplify LambdaUDF coerce_value_types
gstvg Mar 17, 2026
cd22c04
remove DOC.md
gstvg Mar 18, 2026
0188d40
add physical lambda function comments
gstvg Mar 18, 2026
6f2c92b
remove secondary lambda features to be added later
gstvg Mar 18, 2026
b3bdc48
fix removal of lambda features
gstvg Mar 18, 2026
9728a2e
fix typo
gstvg Mar 18, 2026
811aa0a
Merge branch 'main' of https://github.com/apache/datafusion into lambda4
gstvg Mar 18, 2026
f724ef5
remove paste! from lambda macros
gstvg Mar 19, 2026
5380884
fix lambda sqllogictests
gstvg Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions datafusion-examples/examples/sql_ops/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use datafusion::common::{TableReference, plan_err};
use datafusion::config::ConfigOptions;
use datafusion::error::Result;
use datafusion::logical_expr::{
AggregateUDF, Expr, LogicalPlan, ScalarUDF, TableProviderFilterPushDown, TableSource,
WindowUDF,
AggregateUDF, Expr, LambdaUDF, LogicalPlan, ScalarUDF, TableProviderFilterPushDown,
TableSource, WindowUDF,
};
use datafusion::optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule,
Expand Down Expand Up @@ -155,6 +155,10 @@ impl ContextProvider for MyContextProvider {
None
}

fn get_lambda_meta(&self, _name: &str) -> Option<Arc<dyn LambdaUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}
Expand All @@ -175,6 +179,10 @@ impl ContextProvider for MyContextProvider {
Vec::new()
}

fn udlf_names(&self) -> Vec<String> {
Vec::new()
}

fn udaf_names(&self) -> Vec<String> {
Vec::new()
}
Expand Down
14 changes: 13 additions & 1 deletion datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
| Expr::ScalarSubquery(_)
| Expr::SetComparison(_)
| Expr::GroupingSet(_)
| Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
| Expr::Case(_)
| Expr::Lambda(_)
| Expr::LambdaVariable(_) => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match scalar_function.func.signature().volatility {
Expand All @@ -97,6 +99,16 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
}
}
}
Expr::LambdaFunction(lambda_function) => {
match lambda_function.func.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
}

// TODO other expressions are not handled yet:
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
Expand Down
18 changes: 17 additions & 1 deletion datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod proxy;
pub mod string_utils;

use crate::assert_or_internal_err;
use crate::error::{_exec_datafusion_err, _internal_datafusion_err};
use crate::error::{_exec_datafusion_err, _exec_err, _internal_datafusion_err};
use crate::{Result, ScalarValue};
use arrow::array::{
Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
Expand Down Expand Up @@ -971,11 +971,27 @@ pub fn take_function_args<const N: usize, T>(
})
}

/// Returns the inner values of a list, or an error otherwise
/// For [`ListArray`] and [`LargeListArray`], if it's sliced, it returns a
/// sliced array too. Therefore, too reconstruct a list using it,
/// you must adjust the offsets using [`adjust_offsets_for_slice`]
pub fn list_values(array: &dyn Array) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(Arc::clone(array.as_list::<i32>().values())),
DataType::LargeList(_) => Ok(Arc::clone(array.as_list::<i64>().values())),
DataType::FixedSizeList(_, _) => {
Ok(Arc::clone(array.as_fixed_size_list().values()))
}
other => _exec_err!("expected list, got {other}"),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::ScalarValue::Null;
use arrow::array::Float64Array;
use sqlparser::ast::Ident;

#[test]
fn test_bisect_linear_left_and_right() -> Result<()> {
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use datafusion::execution::SessionStateDefaults;
use datafusion_common::{HashSet, Result, not_impl_err};
use datafusion_expr::{
AggregateUDF, DocSection, Documentation, ScalarUDF, WindowUDF,
AggregateUDF, DocSection, Documentation, LambdaUDF, ScalarUDF, WindowUDF,
aggregate_doc_sections, scalar_doc_sections, window_doc_sections,
};
use itertools::Itertools;
Expand Down Expand Up @@ -282,6 +282,18 @@ impl DocProvider for WindowUDF {
}
}

impl DocProvider for dyn LambdaUDF {
fn get_name(&self) -> String {
self.name().to_string()
}
fn get_aliases(&self) -> Vec<String> {
self.aliases().iter().map(|a| a.to_string()).collect()
}
fn get_documentation(&self) -> Option<&Documentation> {
self.documentation()
}
}

#[expect(clippy::borrowed_box)]
fn get_names_and_aliases(functions: &Vec<&Box<dyn DocProvider>>) -> Vec<String> {
functions
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ mod tests {
) -> &HashMap<String, Arc<datafusion_expr::ScalarUDF>> {
unimplemented!()
}
fn lambda_functions(
&self,
) -> &HashMap<String, Arc<dyn datafusion_expr::LambdaUDF>> {
unimplemented!()
}
fn aggregate_functions(
&self,
) -> &HashMap<String, Arc<datafusion_expr::AggregateUDF>> {
Expand Down
16 changes: 16 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use datafusion_execution::disk_manager::{
DEFAULT_MAX_TEMP_DIRECTORY_SIZE, DiskManagerBuilder,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::LambdaUDF;
pub use datafusion_expr::execution_props::ExecutionProps;
#[cfg(feature = "sql")]
use datafusion_expr::planner::RelationPlanner;
Expand Down Expand Up @@ -1976,6 +1977,10 @@ impl FunctionRegistry for SessionContext {
self.state.read().udf(name)
}

fn udlf(&self, name: &str) -> Result<Arc<dyn LambdaUDF>> {
self.state.read().udlf(name)
}

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
self.state.read().udaf(name)
}
Expand All @@ -1988,6 +1993,13 @@ impl FunctionRegistry for SessionContext {
self.state.write().register_udf(udf)
}

fn register_udlf(
&mut self,
udlf: Arc<dyn LambdaUDF>,
) -> Result<Option<Arc<dyn LambdaUDF>>> {
self.state.write().register_udlf(udlf)
}

fn register_udaf(
&mut self,
udaf: Arc<AggregateUDF>,
Expand Down Expand Up @@ -2017,6 +2029,10 @@ impl FunctionRegistry for SessionContext {
self.state.write().register_expr_planner(expr_planner)
}

fn udlfs(&self) -> HashSet<String> {
self.state.read().udlfs()
}

fn udafs(&self) -> HashSet<String> {
self.state.read().udafs()
}
Expand Down
Loading
Loading