-
Notifications
You must be signed in to change notification settings - Fork 2k
Introduce ProjectionExprs::unproject_exprs/project_exprs and improve docs #20193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -842,37 +842,27 @@ impl DataSource for FileScanConfig { | |
| config: &ConfigOptions, | ||
| ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> { | ||
| // Remap filter Column indices to match the table schema (file + partition columns). | ||
| // This is necessary because filters may have been created against a different schema | ||
| // (e.g., after projection pushdown) and need to be remapped to the table schema | ||
| // before being passed to the file source and ultimately serialized. | ||
| // For example, the filter being pushed down is `c1_c2 > 5` and it was created | ||
| // against the output schema of the this `DataSource` which has projection `c1 + c2 as c1_c2`. | ||
| // Thus we need to rewrite the filter back to `c1 + c2 > 5` before passing it to the file source. | ||
| // This is necessary because filters refer to the output schema of this `DataSource` | ||
| // (e.g., after projection pushdown has been applied) and need to be remapped to the table schema | ||
| // before being passed to the file source | ||
| // | ||
| // For example, consider a filter `c1_c2 > 5` being pushed down. If the | ||
| // `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must be rewritten | ||
| // to refer to the table schema `c1 + c2 > 5` | ||
| let table_schema = self.file_source.table_schema().table_schema(); | ||
| // If there's a projection with aliases, first map the filters back through | ||
| // the projection expressions before remapping to the table schema. | ||
| let filters_to_remap = if let Some(projection) = self.file_source.projection() { | ||
| use datafusion_physical_plan::projection::update_expr; | ||
| filters | ||
| .into_iter() | ||
| .map(|filter| { | ||
| update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| { | ||
| internal_datafusion_err!( | ||
| "Failed to map filter expression through projection: {}", | ||
| filter | ||
| ) | ||
| }) | ||
| }) | ||
| .map(|filter| projection.unproject_expr(&filter)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole point of the PR is to pull this function call into a method on |
||
| .collect::<Result<Vec<_>>>()? | ||
| } else { | ||
| filters | ||
| }; | ||
| // Now remap column indices to match the table schema. | ||
| let remapped_filters: Result<Vec<_>> = filters_to_remap | ||
| let remapped_filters = filters_to_remap | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. driveby cleanup (no functional change) |
||
| .into_iter() | ||
| .map(|filter| reassign_expr_columns(filter, table_schema.as_ref())) | ||
| .collect(); | ||
| let remapped_filters = remapped_filters?; | ||
| .map(|filter| reassign_expr_columns(filter, table_schema)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| let result = self | ||
| .file_source | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -372,16 +372,8 @@ impl ProjectionExprs { | |
| pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> { | ||
| let mut new_exprs = Vec::with_capacity(other.exprs.len()); | ||
| for proj_expr in other.exprs.iter() { | ||
| let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? | ||
| .ok_or_else(|| { | ||
| internal_datafusion_err!( | ||
| "Failed to combine projections: expression {} could not be applied on top of existing projections {}", | ||
| proj_expr.expr, | ||
| self.exprs.iter().map(|e| format!("{e}")).join(", ") | ||
| ) | ||
| })?; | ||
| new_exprs.push(ProjectionExpr { | ||
| expr: new_expr, | ||
| expr: self.unproject_expr(&proj_expr.expr)?, | ||
| alias: proj_expr.alias.clone(), | ||
| }); | ||
| } | ||
|
|
@@ -450,9 +442,16 @@ impl ProjectionExprs { | |
| } | ||
|
|
||
| /// Project a schema according to this projection. | ||
| /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1, | ||
| /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`. | ||
| /// Fields' metadata are preserved from the input schema. | ||
| /// | ||
| /// For example, given a projection: | ||
| /// * `SELECT a AS x, b + 1 AS y` | ||
| /// * where `a` is at index 0 | ||
| /// * `b` is at index 1 | ||
| /// | ||
| /// If the input schema is `[a: Int32, b: Int32, c: Int32]`, the output | ||
| /// schema would be `[x: Int32, y: Int32]`. | ||
| /// | ||
| /// Note that [`Field`] metadata are preserved from the input schema. | ||
| pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> { | ||
| let fields: Result<Vec<Field>> = self | ||
| .exprs | ||
|
|
@@ -481,6 +480,48 @@ impl ProjectionExprs { | |
| )) | ||
| } | ||
|
|
||
| /// "unproject" an expression by applying this projection in reverse, | ||
| /// returning a new set of expressions that reference the original input | ||
| /// columns. | ||
| /// | ||
| /// For example, consider | ||
| /// * an expression `c1_c2 > 5`, and a schema `[c1, c2]` | ||
| /// * a projection `c1 + c2 as c1_c2` | ||
| /// | ||
| /// This method would rewrite the expression to `c1 + c2 > 5` | ||
| pub fn unproject_expr( | ||
| &self, | ||
| expr: &Arc<dyn PhysicalExpr>, | ||
| ) -> Result<Arc<dyn PhysicalExpr>> { | ||
| update_expr(expr, &self.exprs, true)?.ok_or_else(|| { | ||
| internal_datafusion_err!( | ||
| "Failed to unproject an expression {} with ProjectionExprs {}", | ||
| expr, | ||
| self.exprs.iter().map(|e| format!("{e}")).join(", ") | ||
| ) | ||
| }) | ||
| } | ||
|
|
||
| /// "project" an expression using these projection's expressions | ||
| /// | ||
| /// For example, consider | ||
| /// * an expression `c1 + c2 > 5`, and a schema `[c1, c2]` | ||
| /// * a projection `c1 + c2 as c1_c2` | ||
| /// | ||
| /// * This method would rewrite the expression to `c1_c2 > 5` | ||
| pub fn project_expr( | ||
| &self, | ||
| expr: &Arc<dyn PhysicalExpr>, | ||
| ) -> Result<Arc<dyn PhysicalExpr>> { | ||
| update_expr(expr, &self.exprs, false)?.ok_or_else(|| { | ||
| internal_datafusion_err!( | ||
| "Failed to project an expression {} with ProjectionExprs {}", | ||
| expr, | ||
| self.exprs.iter().map(|e| format!("{e}")).join(", ") | ||
| ) | ||
| }) | ||
| } | ||
|
|
||
| /// Create a new [`Projector`] from this projection and an input schema. | ||
| /// | ||
| /// A [`Projector`] can be used to apply this projection to record batches. | ||
|
|
@@ -812,26 +853,44 @@ pub fn combine_projections( | |
| )) | ||
| } | ||
|
|
||
| /// The function operates in two modes: | ||
| /// The function projects / unprojects an expression with respect to set of | ||
| /// projection expressions. | ||
| /// | ||
| /// See also [`ProjectionExprs::unproject_expr`] and [`ProjectionExprs::project_expr`] | ||
| /// | ||
| /// 1) When `unproject` is `true`: | ||
| /// | ||
| /// Rewrites an expression with respect to the projection expressions, | ||
| /// effectively "unprojecting" it to reference the original input columns. | ||
| /// | ||
| /// For example, given | ||
| /// * the expressions `a@1 + b@2` and `c@0` | ||
| /// * and projection expressions `c@2, a@0, b@1` | ||
| /// | ||
| /// Then | ||
| /// * `a@1 + b@2` becomes `a@0 + b@1` | ||
| /// * `c@0` becomes `c@2` | ||
| /// | ||
| /// 2) When `unproject` is `false`: | ||
| /// | ||
| /// 1) When `sync_with_child` is `true`: | ||
| /// Rewrites the expression to reference the projected expressions, | ||
| /// effectively "projecting" it. The resulting expression will reference the | ||
| /// indices as they appear in the projection. | ||
| /// | ||
| /// The function updates the indices of `expr` if the expression resides | ||
| /// in the input plan. For instance, given the expressions `a@1 + b@2` | ||
| /// and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are | ||
| /// updated to `a@0 + b@1` and `c@2`. | ||
| /// If the expression cannot be rewritten after the projection, it returns | ||
| /// `None`. | ||
| /// | ||
| /// 2) When `sync_with_child` is `false`: | ||
| /// For example, given | ||
| /// * the expressions `c@0`, `a@1` and `b@2` | ||
| /// * the projection `a@1 as a, c@0 as c_new`, | ||
| /// | ||
| /// The function determines how the expression would be updated if a projection | ||
| /// was placed before the plan associated with the expression. If the expression | ||
| /// cannot be rewritten after the projection, it returns `None`. For example, | ||
| /// given the expressions `c@0`, `a@1` and `b@2`, and the projection with | ||
| /// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes | ||
| /// `a@0`, but `b@2` results in `None` since the projection does not include `b`. | ||
| /// Then | ||
| /// * `c@0` becomes `c_new@1` | ||
| /// * `a@1` becomes `a@0` | ||
| /// * `b@2` results in `None` since the projection does not include `b`. | ||
| /// | ||
| /// # Errors | ||
| /// This function returns an error if `sync_with_child` is `true` and if any expression references | ||
| /// This function returns an error if `unproject` is `true` and if any expression references | ||
| /// an index that is out of bounds for `projected_exprs`. | ||
| /// For example: | ||
| /// | ||
|
|
@@ -842,7 +901,7 @@ pub fn combine_projections( | |
| pub fn update_expr( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I contemplated going even farther and deprecating this function (and routing everything through ProjectionExprs) but I think that will result in some non trivial API churn and I figured we could always do it later
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we were to do this a first step would be to move the logic from |
||
| expr: &Arc<dyn PhysicalExpr>, | ||
| projected_exprs: &[ProjectionExpr], | ||
| sync_with_child: bool, | ||
| unproject: bool, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found this parameter name to be super confusing as there are no "children" involved. I suspect this is historical and predates the work to push expressions down
|
||
| ) -> Result<Option<Arc<dyn PhysicalExpr>>> { | ||
| #[derive(Debug, PartialEq)] | ||
| enum RewriteState { | ||
|
|
@@ -866,7 +925,7 @@ pub fn update_expr( | |
| let Some(column) = expr.as_any().downcast_ref::<Column>() else { | ||
| return Ok(Transformed::no(expr)); | ||
| }; | ||
| if sync_with_child { | ||
| if unproject { | ||
| state = RewriteState::RewrittenValid; | ||
| // Update the index of `column`: | ||
| let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified that the "different schema" is the "table schema"