Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3600,3 +3600,88 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
);
}
}

/// Regression test for https://github.com/apache/datafusion/issues/20109
#[tokio::test]
async fn test_filter_with_projection_pushdown() {
use arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion_physical_plan::collect;
use datafusion_physical_plan::filter::FilterExec;

// Create schema: [time, event, size]
let schema = Arc::new(Schema::new(vec![
Field::new("time", DataType::Int64, false),
Field::new("event", DataType::Utf8, false),
Field::new("size", DataType::Int64, false),
]));

// Create sample data
let timestamps = vec![100i64, 200, 300, 400, 500];
let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
let sizes = vec![10i64, 20, 30, 40, 50];

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(StringArray::from(events)),
Arc::new(Int64Array::from(sizes)),
],
)
.unwrap();

// Create data source
let memory_exec = datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema.clone(),
None,
)
.unwrap();

// First FilterExec: time < 350 with projection=[event@1, size@2]
let time_col = col("time", &memory_exec.schema()).unwrap();
let time_filter = Arc::new(BinaryExpr::new(
time_col,
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
));
let filter1 = Arc::new(
FilterExec::try_new(time_filter, memory_exec)
.unwrap()
.with_projection(Some(vec![1, 2]))
.unwrap(),
);

// Second FilterExec: event = 'Ingestion' with projection=[size@1]
let event_col = col("event", &filter1.schema()).unwrap();
let event_filter = Arc::new(BinaryExpr::new(
event_col,
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Utf8(Some(
"Ingestion".to_string(),
)))),
));
let filter2 = Arc::new(
FilterExec::try_new(event_filter, filter1)
.unwrap()
.with_projection(Some(vec![1]))
.unwrap(),
);

// Apply filter pushdown optimization
let config = ConfigOptions::default();
let optimized_plan = FilterPushdown::new()
.optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
.unwrap();

// Execute the optimized plan - this should not error
let ctx = SessionContext::new();
let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();

// Verify results: should return rows where time < 350 AND event = 'Ingestion'
// That's rows with time=100,200 (both have event='Ingestion'), so sizes 10,20
let expected = [
"+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+",
];
assert_batches_eq!(expected, &result);
}
30 changes: 23 additions & 7 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
conjunction, split_conjunction,
Expand Down Expand Up @@ -526,10 +526,26 @@ impl ExecutionPlan for FilterExec {
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}
// We absorb any parent filters that were not handled by our children
let unsupported_parent_filters =
child_pushdown_result.parent_filters.iter().filter_map(|f| {
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
});
let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
child_pushdown_result
.parent_filters
.iter()
.filter_map(|f| {
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
})
.collect();

// If this FilterExec has a projection, the unsupported parent filters
// are in the output schema (after projection) coordinates. We need to
// remap them to the input schema coordinates before combining with self filters.
if self.projection.is_some() {
let input_schema = self.input().schema();
unsupported_parent_filters = unsupported_parent_filters
.into_iter()
.map(|expr| reassign_expr_columns(expr, &input_schema))
.collect::<Result<Vec<_>>>()?;
}

let unsupported_self_filters = child_pushdown_result
.self_filters
.first()
Expand Down Expand Up @@ -577,7 +593,7 @@ impl ExecutionPlan for FilterExec {
// The new predicate is the same as our current predicate
None
} else {
// Create a new FilterExec with the new predicate
// Create a new FilterExec with the new predicate, preserving the projection
let new = FilterExec {
predicate: Arc::clone(&new_predicate),
input: Arc::clone(&filter_input),
Expand All @@ -589,7 +605,7 @@ impl ExecutionPlan for FilterExec {
self.default_selectivity,
self.projection.as_ref(),
)?,
projection: None,
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
};
Expand Down