Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,37 +842,27 @@ impl DataSource for FileScanConfig {
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
// Remap filter Column indices to match the table schema (file + partition columns).
// This is necessary because filters may have been created against a different schema
// (e.g., after projection pushdown) and need to be remapped to the table schema
// before being passed to the file source and ultimately serialized.
// For example, the filter being pushed down is `c1_c2 > 5` and it was created
// against the output schema of the this `DataSource` which has projection `c1 + c2 as c1_c2`.
// Thus we need to rewrite the filter back to `c1 + c2 > 5` before passing it to the file source.
// This is necessary because filters refer to the output schema of this `DataSource`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified that the "different schema" is the "table schema"

// (e.g., after projection pushdown has been applied) and need to be remapped to the table schema
// before being passed to the file source
//
// For example, consider a filter `c1_c2 > 5` being pushed down. If the
// `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must be rewritten
// to refer to the table schema `c1 + c2 > 5`
let table_schema = self.file_source.table_schema().table_schema();
// If there's a projection with aliases, first map the filters back through
// the projection expressions before remapping to the table schema.
let filters_to_remap = if let Some(projection) = self.file_source.projection() {
use datafusion_physical_plan::projection::update_expr;
filters
.into_iter()
.map(|filter| {
update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| {
internal_datafusion_err!(
"Failed to map filter expression through projection: {}",
filter
)
})
})
.map(|filter| projection.unproject_expr(&filter))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of the PR is to pull this function call into a method on ProjectionExprs and add documentation

.collect::<Result<Vec<_>>>()?
} else {
filters
};
// Now remap column indices to match the table schema.
let remapped_filters: Result<Vec<_>> = filters_to_remap
let remapped_filters = filters_to_remap
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driveby cleanup (no functional change)

.into_iter()
.map(|filter| reassign_expr_columns(filter, table_schema.as_ref()))
.collect();
let remapped_filters = remapped_filters?;
.map(|filter| reassign_expr_columns(filter, table_schema))
.collect::<Result<Vec<_>>>()?;

let result = self
.file_source
Expand Down
115 changes: 87 additions & 28 deletions datafusion/physical-expr/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,8 @@ impl ProjectionExprs {
pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
let mut new_exprs = Vec::with_capacity(other.exprs.len());
for proj_expr in other.exprs.iter() {
let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
.ok_or_else(|| {
internal_datafusion_err!(
"Failed to combine projections: expression {} could not be applied on top of existing projections {}",
proj_expr.expr,
self.exprs.iter().map(|e| format!("{e}")).join(", ")
)
})?;
new_exprs.push(ProjectionExpr {
expr: new_expr,
expr: self.unproject_expr(&proj_expr.expr)?,
alias: proj_expr.alias.clone(),
});
}
Expand Down Expand Up @@ -450,9 +442,16 @@ impl ProjectionExprs {
}

/// Project a schema according to this projection.
/// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
/// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`.
/// Fields' metadata are preserved from the input schema.
///
/// For example, given a projection:
/// * `SELECT a AS x, b + 1 AS y`
/// * where `a` is at index 0
/// * `b` is at index 1
///
/// If the input schema is `[a: Int32, b: Int32, c: Int32]`, the output
/// schema would be `[x: Int32, y: Int32]`.
///
/// Note that [`Field`] metadata are preserved from the input schema.
pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
let fields: Result<Vec<Field>> = self
.exprs
Expand Down Expand Up @@ -481,6 +480,48 @@ impl ProjectionExprs {
))
}

/// "unproject" an expression by applying this projection in reverse,
/// returning a new set of expressions that reference the original input
/// columns.
///
/// For example, consider
/// * an expression `c1_c2 > 5`, and a schema `[c1, c2]`
/// * a projection `c1 + c2 as c1_c2`
///
/// This method would rewrite the expression to `c1 + c2 > 5`
pub fn unproject_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Result<Arc<dyn PhysicalExpr>> {
update_expr(expr, &self.exprs, true)?.ok_or_else(|| {
internal_datafusion_err!(
"Failed to unproject an expression {} with ProjectionExprs {}",
expr,
self.exprs.iter().map(|e| format!("{e}")).join(", ")
)
})
}

/// "project" an expression using these projection's expressions
///
/// For example, consider
/// * an expression `c1 + c2 > 5`, and a schema `[c1, c2]`
/// * a projection `c1 + c2 as c1_c2`
///
/// * This method would rewrite the expression to `c1_c2 > 5`
pub fn project_expr(
&self,
expr: &Arc<dyn PhysicalExpr>,
) -> Result<Arc<dyn PhysicalExpr>> {
update_expr(expr, &self.exprs, false)?.ok_or_else(|| {
internal_datafusion_err!(
"Failed to project an expression {} with ProjectionExprs {}",
expr,
self.exprs.iter().map(|e| format!("{e}")).join(", ")
)
})
}

/// Create a new [`Projector`] from this projection and an input schema.
///
/// A [`Projector`] can be used to apply this projection to record batches.
Expand Down Expand Up @@ -812,26 +853,44 @@ pub fn combine_projections(
))
}

/// The function operates in two modes:
/// The function projects / unprojects an expression with respect to set of
/// projection expressions.
///
/// See also [`ProjectionExprs::unproject_expr`] and [`ProjectionExprs::project_expr`]
///
/// 1) When `unproject` is `true`:
///
/// Rewrites an expression with respect to the projection expressions,
/// effectively "unprojecting" it to reference the original input columns.
///
/// For example, given
/// * the expressions `a@1 + b@2` and `c@0`
/// * and projection expressions `c@2, a@0, b@1`
///
/// Then
/// * `a@1 + b@2` becomes `a@0 + b@1`
/// * `c@0` becomes `c@2`
///
/// 2) When `unproject` is `false`:
///
/// 1) When `sync_with_child` is `true`:
/// Rewrites the expression to reference the projected expressions,
/// effectively "projecting" it. The resulting expression will reference the
/// indices as they appear in the projection.
///
/// The function updates the indices of `expr` if the expression resides
/// in the input plan. For instance, given the expressions `a@1 + b@2`
/// and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are
/// updated to `a@0 + b@1` and `c@2`.
/// If the expression cannot be rewritten after the projection, it returns
/// `None`.
///
/// 2) When `sync_with_child` is `false`:
/// For example, given
/// * the expressions `c@0`, `a@1` and `b@2`
/// * the projection `a@1 as a, c@0 as c_new`,
///
/// The function determines how the expression would be updated if a projection
/// was placed before the plan associated with the expression. If the expression
/// cannot be rewritten after the projection, it returns `None`. For example,
/// given the expressions `c@0`, `a@1` and `b@2`, and the projection with
/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
/// `a@0`, but `b@2` results in `None` since the projection does not include `b`.
/// Then
/// * `c@0` becomes `c_new@1`
/// * `a@1` becomes `a@0`
/// * `b@2` results in `None` since the projection does not include `b`.
///
/// # Errors
/// This function returns an error if `sync_with_child` is `true` and if any expression references
/// This function returns an error if `unproject` is `true` and if any expression references
/// an index that is out of bounds for `projected_exprs`.
/// For example:
///
Expand All @@ -842,7 +901,7 @@ pub fn combine_projections(
pub fn update_expr(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I contemplated going even farther and deprecating this function (and routing everything through ProjectionExprs) but I think that will result in some non trivial API churn and I figured we could always do it later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we were to do this a first step would be to move the logic from update_expr into the functions on ProjectionExprs and make update_expr call those, maybe adding a note that it's "soft deprecated" and we recommend calling ProjectionExprs instead? I think that could fit in this PR.

expr: &Arc<dyn PhysicalExpr>,
projected_exprs: &[ProjectionExpr],
sync_with_child: bool,
unproject: bool,
Copy link
Contributor Author

@alamb alamb Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this parameter name to be super confusing as there are no "children" involved. I suspect this is historical and predates the work to push expressions down

unproject as the best I could come up with -- but would love some thoughts on better names

) -> Result<Option<Arc<dyn PhysicalExpr>>> {
#[derive(Debug, PartialEq)]
enum RewriteState {
Expand All @@ -866,7 +925,7 @@ pub fn update_expr(
let Some(column) = expr.as_any().downcast_ref::<Column>() else {
return Ok(Transformed::no(expr));
};
if sync_with_child {
if unproject {
state = RewriteState::RewrittenValid;
// Update the index of `column`:
let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
Expand Down