diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 5eb9d6eb1b865..c87847d904a0e 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -22,7 +22,7 @@ use super::ordering::collapse_lex_ordering; use crate::equivalence::{ collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::{CastExpr, Literal}; +use crate::expressions::{CastExpr, Column, Literal}; use crate::sort_properties::{ExprOrdering, SortProperties}; use crate::{ physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, @@ -179,6 +179,31 @@ impl EquivalenceProperties { self.eq_group.extend(other_eq_group); } + /// Adds children orderings that can be deduced from the ordering of the expression. + fn add_deduced_children_orderings( + &mut self, + expr: &Arc, + options: SortOptions, + ) { + // See whether ordering information can help deduce some ordering children expressions. + let children_orderings = expr.get_children_ordering(options); + let children = expr.children(); + for (child, ordering) in children.into_iter().zip(children_orderings) { + if let SortProperties::Ordered(options) = ordering { + // Add ordering only for leaf expressions + if child.as_any().is::() { + let sort_expr = PhysicalSortExpr { + expr: child, + options, + }; + self.add_new_orderings(vec![vec![sort_expr]]); + } else { + self.add_deduced_children_orderings(&child, options); + } + } + } + } + /// Adds a new equality condition into the existing equivalence group. /// If the given equality defines a new equivalence class, adds this new /// equivalence class to the equivalence group. @@ -187,6 +212,19 @@ impl EquivalenceProperties { left: &Arc, right: &Arc, ) { + let left_expr_ordering = self.get_expr_ordering(left.clone()); + let right_expr_ordering = self.get_expr_ordering(right.clone()); + match (left_expr_ordering.data, right_expr_ordering.data) { + (SortProperties::Ordered(options), SortProperties::Unordered) => { + // Left is ordered, right is not ordered. + self.add_deduced_children_orderings(right, options); + } + (SortProperties::Unordered, SortProperties::Ordered(options)) => { + // Right is ordered, left is not ordered. + self.add_deduced_children_orderings(left, options); + } + (_, _) => {} + } self.eq_group.add_equal_conditions(left, right); } @@ -2199,6 +2237,53 @@ mod tests { ); } + Ok(()) + } + #[test] + fn test_eliminate_redundant_monotonic_sorts() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true), + ])); + let mut properties = EquivalenceProperties::new(schema.clone()) + .with_reorder( + ["a", "b", "c"] + .into_iter() + .map(|c| { + col(c, schema.as_ref()).map(|expr| PhysicalSortExpr { + expr, + options: SortOptions { + descending: false, + nulls_first: true, + }, + }) + }) + .collect::>>()?, + ) + // b is constant, so it should be removed from the sort order + .add_constants(Some(col("b", schema.as_ref())?)); + + // If c is a monotonic expression of a, then order by (c, a) is equivalent to order by (a) + properties.add_equal_conditions( + &(Arc::new(CastExpr::new( + col("c", schema.as_ref())?, + DataType::Date32, + None, + )) as Arc), + &col("a", schema.as_ref())?, + ); + + let sort = &[PhysicalSortExpr { + expr: col("c", schema.as_ref())?, + options: SortOptions { + descending: false, + nulls_first: true, + }, + }]; + + assert!(properties.ordering_satisfy(sort)); + Ok(()) } } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 0d94642f14e79..f3a897da808a0 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -27,6 +27,7 @@ use DataType::*; use arrow::compute::{can_cast_types, CastOptions}; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use arrow_schema::SortOptions; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; use datafusion_common::{not_impl_err, Result}; use datafusion_expr::interval_arithmetic::Interval; @@ -167,6 +168,11 @@ impl PhysicalExpr for CastExpr { fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { children[0] } + + /// If it is known that [`CastExpr`] have an ordering. Its input should have same ordering. + fn get_children_ordering(&self, ordering: SortOptions) -> Vec { + vec![SortProperties::Ordered(ordering)] + } } impl PartialEq for CastExpr { diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 861a4ad028018..fa56171c4a579 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -27,6 +27,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use arrow_schema::SortOptions; use datafusion_common::utils::DataPtr; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr::interval_arithmetic::Interval; @@ -226,6 +227,15 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties { SortProperties::Unordered } + + /// The order information of the children expressions based the ordering of the expression. + /// If it can be deduced. This is helpful in propagating constraints on the output expression + /// to the children. + fn get_children_ordering(&self, _ordering: SortOptions) -> Vec { + // In the absense of information, assume children have no ordering. + let n_children = self.children().len(); + vec![SortProperties::Unordered; n_children] + } } impl Hash for dyn PhysicalExpr {