-
Notifications
You must be signed in to change notification settings - Fork 2k
Better document the relationship between FileFormat::projection / FileFormat::filter and FileScanConfig::Statistics
#20188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,12 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> | |
|
|
||
| /// file format specific behaviors for elements in [`DataSource`] | ||
| /// | ||
| /// # Schema information | ||
| /// There are two important schemas for a [`FileSource`]: | ||
| /// 1. [`Self::table_schema`] -- the schema for the overall "table" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this include partition columns?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it be worth explicitly noting in the table_schema() doc that it includes partition columns? e.g. "the schema for the overall table (file schema + partition columns)". The FileScanConfig docs mention this, but it's easy to miss when reading FileSource alone. |
||
| /// 2. The logical output schema, comprised of [`Self::table_schema`] with | ||
| /// [`Self::projection`] applied | ||
| /// | ||
| /// See more details on specific implementations: | ||
| /// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html) | ||
| /// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html) | ||
|
|
@@ -64,24 +70,38 @@ pub trait FileSource: Send + Sync { | |
| ) -> Result<Arc<dyn FileOpener>>; | ||
| /// Any | ||
| fn as_any(&self) -> &dyn Any; | ||
|
|
||
| /// Returns the table schema for this file source. | ||
| /// | ||
| /// This always returns the unprojected schema (the full schema of the data). | ||
| /// This always returns the unprojected schema (the full schema of the data) | ||
| /// without [`Self::projection`] applied. | ||
| /// | ||
| /// The output schema of this `FileSource` is this TableSchema | ||
| /// with [`Self::projection`] applied. | ||
|
Comment on lines
+79
to
+80
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wonder if we should add a helper method to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is |
||
| fn table_schema(&self) -> &crate::table_schema::TableSchema; | ||
|
|
||
| /// Initialize new type with batch size configuration | ||
| fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>; | ||
| /// Returns the filter expression that will be applied during the file scan. | ||
|
|
||
| /// Returns the filter expression that will be applied *during* the file scan. | ||
| /// | ||
| /// These expressions are in terms of the unprojected [`Self::table_schema`]. | ||
| fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> { | ||
| None | ||
| } | ||
| /// Return the projection that will be applied to the output stream on top of the table schema. | ||
|
|
||
| /// Return the projection that will be applied to the output stream on top | ||
| /// of [`Self::table_schema`]. | ||
| fn projection(&self) -> Option<&ProjectionExprs> { | ||
| None | ||
| } | ||
|
|
||
| /// Return execution plan metrics | ||
| fn metrics(&self) -> &ExecutionPlanMetricsSet; | ||
|
|
||
| /// String representation of file source such as "csv", "json", "parquet" | ||
| fn file_type(&self) -> &str; | ||
|
|
||
| /// Format FileType specific information | ||
| fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { | ||
| Ok(()) | ||
|
|
@@ -135,6 +155,19 @@ pub trait FileSource: Send + Sync { | |
| } | ||
|
|
||
| /// Try to push down filters into this FileSource. | ||
| /// | ||
| /// `filters` must be in terms of the unprojected table schema (file schema | ||
| /// plus partition columns), before any projection is applied. | ||
| /// | ||
| /// Any filters that this FileSource chooses to evaluate itself should be | ||
| /// returned as `PushedDown::Yes` in the result, along with a FileSource | ||
| /// instance that incorporates those filters. Such filters are logically | ||
| /// applied "during" the file scan, meaning they may refer to columns not | ||
| /// included in the final output projection. | ||
| /// | ||
| /// Filters that cannot be pushed down should be marked as `PushedDown::No`, | ||
| /// and will be evaluated by an execution plan after the file source. | ||
| /// | ||
| /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. | ||
| /// | ||
| /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result | ||
|
|
@@ -220,7 +253,7 @@ pub trait FileSource: Send + Sync { | |
| Ok(SortOrderPushdownResult::Unsupported) | ||
| } | ||
|
|
||
| /// Try to push down a projection into a this FileSource. | ||
| /// Try to push down a projection into this FileSource. | ||
| /// | ||
| /// `FileSource` implementations that support projection pushdown should | ||
| /// override this method and return a new `FileSource` instance with the | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,10 +55,21 @@ use datafusion_physical_plan::{ | |
| use log::{debug, warn}; | ||
| use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; | ||
|
|
||
| /// The base configurations for a [`DataSourceExec`], the a physical plan for | ||
| /// any given file format. | ||
| /// [`FileScanConfig`] represents scanning data from a group of files | ||
| /// | ||
| /// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`. | ||
| /// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan | ||
| /// for scanning files with a particular file format. | ||
| /// | ||
| /// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible | ||
| /// for creating the actual execution plan to read the files based on a | ||
| /// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent | ||
| /// information about the files **before** any projection or filtering is | ||
|
Comment on lines
+65
to
+66
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is good. I still get confused about stats before vs. after projection / filtering. I do have some caveats: for us we use an external index to prune row groups, so we can actually get more accurate statistics for min/max, num rows, etc. in a file taking into account row groups pruned. It's also useful to project the statistics (or at least only populate them for columns that are in filters or projections); no point in collecting / allocating statistics for columns that will never be used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree what the code currently does is not ideal. However, before I fix it I need to understand what it currently does :) BTW I am actually looking into the same sort of question for the EquivalenceProperties |
||
| /// applied in the file source. | ||
| /// | ||
| /// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`. | ||
| /// | ||
| /// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from | ||
| /// a `FileScanConfig`. | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
|
|
@@ -169,8 +180,11 @@ pub struct FileScanConfig { | |
| /// Expression adapter used to adapt filters and projections that are pushed down into the scan | ||
| /// from the logical schema to the physical schema of the file. | ||
| pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>, | ||
| /// Unprojected statistics for the table (file schema + partition columns). | ||
| /// These are projected on-demand via `projected_stats()`. | ||
| /// Statistics for the entire table (file schema + partition columns). | ||
| /// See [`FileScanConfigBuilder::with_statistics`] for more details. | ||
| /// | ||
| /// The effective statistics are computed on-demand via | ||
| /// [`ProjectionExprs::project_statistics`]. | ||
| /// | ||
| /// Note that this field is pub(crate) because accessing it directly from outside | ||
| /// would be incorrect if there are filters being applied, thus this should be accessed | ||
|
|
@@ -283,31 +297,35 @@ impl FileScanConfigBuilder { | |
| } | ||
| } | ||
|
|
||
| /// Set the maximum number of records to read from this plan. If `None`, | ||
| /// all records after filtering are returned. | ||
| /// Set the maximum number of records to read from this plan. | ||
| /// | ||
| /// If `None`, all records after filtering are returned. | ||
| pub fn with_limit(mut self, limit: Option<usize>) -> Self { | ||
| self.limit = limit; | ||
| self | ||
| } | ||
|
|
||
| /// Set whether the limit should be order-sensitive. | ||
| /// | ||
| /// When `true`, files must be read in the exact order specified to produce | ||
| /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, | ||
| /// DataFusion may reorder file processing for optimization without affecting correctness. | ||
| /// DataFusion may reorder file processing for optimization without | ||
| /// affecting correctness. | ||
| pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self { | ||
| self.preserve_order = order_sensitive; | ||
| self | ||
| } | ||
|
|
||
| /// Set the file source for scanning files. | ||
| /// | ||
| /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) | ||
| /// after the builder has been created. | ||
| /// This method allows you to change the file source implementation (e.g. | ||
| /// ParquetSource, CsvSource, etc.) after the builder has been created. | ||
| pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self { | ||
| self.file_source = file_source; | ||
| self | ||
| } | ||
|
|
||
| /// Return the table schema | ||
| pub fn table_schema(&self) -> &SchemaRef { | ||
| self.file_source.table_schema().table_schema() | ||
| } | ||
|
|
@@ -332,7 +350,12 @@ impl FileScanConfigBuilder { | |
|
|
||
| /// Set the columns on which to project the data using column indices. | ||
| /// | ||
| /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`. | ||
| /// This method attempts to push down the projection to the underlying file | ||
| /// source if supported. If the file source does not support projection | ||
| /// pushdown, an error is returned. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we return
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I double checked and I think the comment is correct and this code does return an error
A few lines below in this diff: if let Some(new_source) = new_source {
self.file_source = new_source;
} else {
internal_err!( // <------------ This error
"FileSource {} does not support projection pushdown",
self.file_source.file_type()
)?;
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm yeah okay. Don't love that haha but I guess we are documenting the current status quo.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I agree -- I struggled with fixing everything I found questionable with just documenting what it was doing |
||
| /// | ||
| /// Indexes that are higher than the number of columns of `file_schema` | ||
| /// refer to `table_partition_cols`. | ||
| pub fn with_projection_indices( | ||
| mut self, | ||
| indices: Option<Vec<usize>>, | ||
|
|
@@ -371,8 +394,18 @@ impl FileScanConfigBuilder { | |
| self | ||
| } | ||
|
|
||
| /// Set the estimated overall statistics of the files, taking `filters` into account. | ||
| /// Defaults to [`Statistics::new_unknown`]. | ||
| /// Set the statistics of the files, including partition | ||
| /// columns. Defaults to [`Statistics::new_unknown`]. | ||
| /// | ||
| /// These statistics are for the entire table (file schema + partition | ||
| /// columns) before any projection or filtering is applied. Projections are | ||
| /// applied when statistics are retrieved, and if a filter is present, | ||
| /// [`FileScanConfig::statistics`] will mark the statistics as inexact | ||
| /// (counts are not adjusted). | ||
| /// | ||
| /// Projections and filters may be applied by the file source, either by | ||
| /// [`Self::with_projection_indices`] or a preexisting | ||
| /// [`FileSource::projection`] or [`FileSource::filter`]. | ||
| pub fn with_statistics(mut self, statistics: Statistics) -> Self { | ||
| self.statistics = Some(statistics); | ||
| self | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is one of the key disitinctions -- there are two relevant schemas and it is important to specify which is being used at any time