diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 9956b120f18ef..8c64ce7842195 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -842,37 +842,27 @@ impl DataSource for FileScanConfig { config: &ConfigOptions, ) -> Result>> { // Remap filter Column indices to match the table schema (file + partition columns). - // This is necessary because filters may have been created against a different schema - // (e.g., after projection pushdown) and need to be remapped to the table schema - // before being passed to the file source and ultimately serialized. - // For example, the filter being pushed down is `c1_c2 > 5` and it was created - // against the output schema of the this `DataSource` which has projection `c1 + c2 as c1_c2`. - // Thus we need to rewrite the filter back to `c1 + c2 > 5` before passing it to the file source. + // This is necessary because filters refer to the output schema of this `DataSource` + // (e.g., after projection pushdown has been applied) and need to be remapped to the table schema + // before being passed to the file source + // + // For example, consider a filter `c1_c2 > 5` being pushed down. If the + // `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must be rewritten + // to refer to the table schema `c1 + c2 > 5` let table_schema = self.file_source.table_schema().table_schema(); - // If there's a projection with aliases, first map the filters back through - // the projection expressions before remapping to the table schema. let filters_to_remap = if let Some(projection) = self.file_source.projection() { - use datafusion_physical_plan::projection::update_expr; filters .into_iter() - .map(|filter| { - update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| { - internal_datafusion_err!( - "Failed to map filter expression through projection: {}", - filter - ) - }) - }) + .map(|filter| projection.unproject_expr(&filter)) .collect::>>()? } else { filters }; // Now remap column indices to match the table schema. - let remapped_filters: Result> = filters_to_remap + let remapped_filters = filters_to_remap .into_iter() - .map(|filter| reassign_expr_columns(filter, table_schema.as_ref())) - .collect(); - let remapped_filters = remapped_filters?; + .map(|filter| reassign_expr_columns(filter, table_schema)) + .collect::>>()?; let result = self .file_source diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index ae83a74627bed..dbbd289415277 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -372,16 +372,8 @@ impl ProjectionExprs { pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); for proj_expr in other.exprs.iter() { - let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? - .ok_or_else(|| { - internal_datafusion_err!( - "Failed to combine projections: expression {} could not be applied on top of existing projections {}", - proj_expr.expr, - self.exprs.iter().map(|e| format!("{e}")).join(", ") - ) - })?; new_exprs.push(ProjectionExpr { - expr: new_expr, + expr: self.unproject_expr(&proj_expr.expr)?, alias: proj_expr.alias.clone(), }); } @@ -450,9 +442,16 @@ impl ProjectionExprs { } /// Project a schema according to this projection. - /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1, - /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`. - /// Fields' metadata are preserved from the input schema. + /// + /// For example, given a projection: + /// * `SELECT a AS x, b + 1 AS y` + /// * where `a` is at index 0 + /// * `b` is at index 1 + /// + /// If the input schema is `[a: Int32, b: Int32, c: Int32]`, the output + /// schema would be `[x: Int32, y: Int32]`. + /// + /// Note that [`Field`] metadata are preserved from the input schema. pub fn project_schema(&self, input_schema: &Schema) -> Result { let fields: Result> = self .exprs @@ -481,6 +480,48 @@ impl ProjectionExprs { )) } + /// "unproject" an expression by applying this projection in reverse, + /// returning a new set of expressions that reference the original input + /// columns. + /// + /// For example, consider + /// * an expression `c1_c2 > 5`, and a schema `[c1, c2]` + /// * a projection `c1 + c2 as c1_c2` + /// + /// This method would rewrite the expression to `c1 + c2 > 5` + pub fn unproject_expr( + &self, + expr: &Arc, + ) -> Result> { + update_expr(expr, &self.exprs, true)?.ok_or_else(|| { + internal_datafusion_err!( + "Failed to unproject an expression {} with ProjectionExprs {}", + expr, + self.exprs.iter().map(|e| format!("{e}")).join(", ") + ) + }) + } + + /// "project" an expression using these projection's expressions + /// + /// For example, consider + /// * an expression `c1 + c2 > 5`, and a schema `[c1, c2]` + /// * a projection `c1 + c2 as c1_c2` + /// + /// * This method would rewrite the expression to `c1_c2 > 5` + pub fn project_expr( + &self, + expr: &Arc, + ) -> Result> { + update_expr(expr, &self.exprs, false)?.ok_or_else(|| { + internal_datafusion_err!( + "Failed to project an expression {} with ProjectionExprs {}", + expr, + self.exprs.iter().map(|e| format!("{e}")).join(", ") + ) + }) + } + /// Create a new [`Projector`] from this projection and an input schema. /// /// A [`Projector`] can be used to apply this projection to record batches. @@ -812,26 +853,44 @@ pub fn combine_projections( )) } -/// The function operates in two modes: +/// The function projects / unprojects an expression with respect to set of +/// projection expressions. +/// +/// See also [`ProjectionExprs::unproject_expr`] and [`ProjectionExprs::project_expr`] +/// +/// 1) When `unproject` is `true`: +/// +/// Rewrites an expression with respect to the projection expressions, +/// effectively "unprojecting" it to reference the original input columns. +/// +/// For example, given +/// * the expressions `a@1 + b@2` and `c@0` +/// * and projection expressions `c@2, a@0, b@1` +/// +/// Then +/// * `a@1 + b@2` becomes `a@0 + b@1` +/// * `c@0` becomes `c@2` +/// +/// 2) When `unproject` is `false`: /// -/// 1) When `sync_with_child` is `true`: +/// Rewrites the expression to reference the projected expressions, +/// effectively "projecting" it. The resulting expression will reference the +/// indices as they appear in the projection. /// -/// The function updates the indices of `expr` if the expression resides -/// in the input plan. For instance, given the expressions `a@1 + b@2` -/// and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are -/// updated to `a@0 + b@1` and `c@2`. +/// If the expression cannot be rewritten after the projection, it returns +/// `None`. /// -/// 2) When `sync_with_child` is `false`: +/// For example, given +/// * the expressions `c@0`, `a@1` and `b@2` +/// * the projection `a@1 as a, c@0 as c_new`, /// -/// The function determines how the expression would be updated if a projection -/// was placed before the plan associated with the expression. If the expression -/// cannot be rewritten after the projection, it returns `None`. For example, -/// given the expressions `c@0`, `a@1` and `b@2`, and the projection with -/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes -/// `a@0`, but `b@2` results in `None` since the projection does not include `b`. +/// Then +/// * `c@0` becomes `c_new@1` +/// * `a@1` becomes `a@0` +/// * `b@2` results in `None` since the projection does not include `b`. /// /// # Errors -/// This function returns an error if `sync_with_child` is `true` and if any expression references +/// This function returns an error if `unproject` is `true` and if any expression references /// an index that is out of bounds for `projected_exprs`. /// For example: /// @@ -842,7 +901,7 @@ pub fn combine_projections( pub fn update_expr( expr: &Arc, projected_exprs: &[ProjectionExpr], - sync_with_child: bool, + unproject: bool, ) -> Result>> { #[derive(Debug, PartialEq)] enum RewriteState { @@ -866,7 +925,7 @@ pub fn update_expr( let Some(column) = expr.as_any().downcast_ref::() else { return Ok(Transformed::no(expr)); }; - if sync_with_child { + if unproject { state = RewriteState::RewrittenValid; // Update the index of `column`: let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {