From 32a77c250bfdc574bd4d0acd7bb9db2beb3a69f6 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 4 Feb 2026 05:26:51 +0800 Subject: [PATCH 1/2] fix: filter pushdown when merge filter (#20110) ## Which issue does this PR close? - Closes #20109 ## Rationale for this change see issue #20109 ## What changes are included in this PR? 1. Remap parent filter expressions: When a FilterExec has a projection, remap unsupported parent filter expressions from output schema coordinates to input schema coordinates using `reassign_expr_columns()` before combining them with the current filter's predicates. 2. Preserve projection: When creating the merged FilterExec, preserve the original projection instead of discarding it . ## Are these changes tested? yes, add some test case ## Are there any user-facing changes? --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- .../physical_optimizer/filter_pushdown/mod.rs | 87 +++++++++++++++++++ datafusion/physical-plan/src/filter.rs | 30 +++++-- 2 files changed, 110 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d6357fdf6bc7d..8baf079aa5ba9 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3600,3 +3600,90 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { ); } } + +/// Regression test for https://github.com/apache/datafusion/issues/20109 +#[tokio::test] +async fn test_filter_with_projection_pushdown() { + use arrow::array::{Int64Array, RecordBatch, StringArray}; + use datafusion_physical_plan::collect; + use datafusion_physical_plan::filter::FilterExecBuilder; + + // Create schema: [time, event, size] + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Int64, false), + Field::new("event", DataType::Utf8, false), + Field::new("size", DataType::Int64, false), + ])); + + // Create sample data + let timestamps = vec![100i64, 200, 300, 400, 500]; + let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"]; + let sizes = vec![10i64, 20, 30, 40, 50]; + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(timestamps)), + Arc::new(StringArray::from(events)), + Arc::new(Int64Array::from(sizes)), + ], + ) + .unwrap(); + + // Create data source + let memory_exec = datafusion_datasource::memory::MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema.clone(), + None, + ) + .unwrap(); + + // First FilterExec: time < 350 with projection=[event@1, size@2] + let time_col = col("time", &memory_exec.schema()).unwrap(); + let time_filter = Arc::new(BinaryExpr::new( + time_col, + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int64(Some(350)))), + )); + let filter1 = Arc::new( + FilterExecBuilder::new(time_filter, memory_exec) + .apply_projection(Some(vec![1, 2])) + .unwrap() + .build() + .unwrap(), + ); + + // Second FilterExec: event = 'Ingestion' with projection=[size@1] + let event_col = col("event", &filter1.schema()).unwrap(); + let event_filter = Arc::new(BinaryExpr::new( + event_col, + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some( + "Ingestion".to_string(), + )))), + )); + let filter2 = Arc::new( + FilterExecBuilder::new(event_filter, filter1) + .apply_projection(Some(vec![1])) + .unwrap() + .build() + .unwrap(), + ); + + // Apply filter pushdown optimization + let config = ConfigOptions::default(); + let optimized_plan = FilterPushdown::new() + .optimize(Arc::clone(&filter2) as Arc, &config) + .unwrap(); + + // Execute the optimized plan - this should not error + let ctx = SessionContext::new(); + let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap(); + + // Verify results: should return rows where time < 350 AND event = 'Ingestion' + // That's rows with time=100,200 (both have event='Ingestion'), so sizes 10,20 + let expected = [ + "+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+", + ]; + assert_batches_eq!(expected, &result); +} diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e724cdad64840..82db5c7371c65 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -57,7 +57,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit}; use datafusion_physical_expr::intervals::utils::check_support; -use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{ AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze, conjunction, split_conjunction, @@ -526,10 +526,26 @@ impl ExecutionPlan for FilterExec { return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); } // We absorb any parent filters that were not handled by our children - let unsupported_parent_filters = - child_pushdown_result.parent_filters.iter().filter_map(|f| { - matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter)) - }); + let mut unsupported_parent_filters: Vec> = + child_pushdown_result + .parent_filters + .iter() + .filter_map(|f| { + matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter)) + }) + .collect(); + + // If this FilterExec has a projection, the unsupported parent filters + // are in the output schema (after projection) coordinates. We need to + // remap them to the input schema coordinates before combining with self filters. + if self.projection.is_some() { + let input_schema = self.input().schema(); + unsupported_parent_filters = unsupported_parent_filters + .into_iter() + .map(|expr| reassign_expr_columns(expr, &input_schema)) + .collect::>>()?; + } + let unsupported_self_filters = child_pushdown_result .self_filters .first() @@ -577,7 +593,7 @@ impl ExecutionPlan for FilterExec { // The new predicate is the same as our current predicate None } else { - // Create a new FilterExec with the new predicate + // Create a new FilterExec with the new predicate, preserving the projection let new = FilterExec { predicate: Arc::clone(&new_predicate), input: Arc::clone(&filter_input), @@ -589,7 +605,7 @@ impl ExecutionPlan for FilterExec { self.default_selectivity, self.projection.as_ref(), )?, - projection: None, + projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, }; From 8d1a3c80d7300fd80312d9ad389c4dba81add520 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 11 Feb 2026 22:11:25 +0800 Subject: [PATCH 2/2] fix test case --- .../tests/physical_optimizer/filter_pushdown/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 8baf079aa5ba9..ad0d22e24af3e 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -3606,7 +3606,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { async fn test_filter_with_projection_pushdown() { use arrow::array::{Int64Array, RecordBatch, StringArray}; use datafusion_physical_plan::collect; - use datafusion_physical_plan::filter::FilterExecBuilder; + use datafusion_physical_plan::filter::FilterExec; // Create schema: [time, event, size] let schema = Arc::new(Schema::new(vec![ @@ -3646,10 +3646,9 @@ async fn test_filter_with_projection_pushdown() { Arc::new(Literal::new(ScalarValue::Int64(Some(350)))), )); let filter1 = Arc::new( - FilterExecBuilder::new(time_filter, memory_exec) - .apply_projection(Some(vec![1, 2])) + FilterExec::try_new(time_filter, memory_exec) .unwrap() - .build() + .with_projection(Some(vec![1, 2])) .unwrap(), ); @@ -3663,10 +3662,9 @@ async fn test_filter_with_projection_pushdown() { )))), )); let filter2 = Arc::new( - FilterExecBuilder::new(event_filter, filter1) - .apply_projection(Some(vec![1])) + FilterExec::try_new(event_filter, filter1) .unwrap() - .build() + .with_projection(Some(vec![1])) .unwrap(), );