Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>

/// file format specific behaviors for elements in [`DataSource`]
///
/// # Schema information
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is one of the key disitinctions -- there are two relevant schemas and it is important to specify which is being used at any time

/// There are two important schemas for a [`FileSource`]:
/// 1. [`Self::table_schema`] -- the schema for the overall "table"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this include partition columns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it be worth explicitly noting in the table_schema() doc that it includes partition columns? e.g. "the schema for the overall table (file schema + partition columns)". The FileScanConfig docs mention this, but it's easy to miss when reading FileSource alone.

/// 2. The logical output schema, comprised of [`Self::table_schema`] with
/// [`Self::projection`] applied
///
/// See more details on specific implementations:
/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
Expand All @@ -64,24 +70,38 @@ pub trait FileSource: Send + Sync {
) -> Result<Arc<dyn FileOpener>>;
/// Any
fn as_any(&self) -> &dyn Any;

/// Returns the table schema for this file source.
///
/// This always returns the unprojected schema (the full schema of the data).
/// This always returns the unprojected schema (the full schema of the data)
/// without [`Self::projection`] applied.
///
/// The output schema of this `FileSource` is this TableSchema
/// with [`Self::projection`] applied.
Comment on lines +79 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should add a helper method to FileSource that applies the projection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is ProjectionExprs::project_schema which does so. Maybe I could add a link to that

fn table_schema(&self) -> &crate::table_schema::TableSchema;

/// Initialize new type with batch size configuration
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
/// Returns the filter expression that will be applied during the file scan.

/// Returns the filter expression that will be applied *during* the file scan.
///
/// These expressions are in terms of the unprojected [`Self::table_schema`].
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
None
}
/// Return the projection that will be applied to the output stream on top of the table schema.

/// Return the projection that will be applied to the output stream on top
/// of [`Self::table_schema`].
fn projection(&self) -> Option<&ProjectionExprs> {
None
}

/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;

/// String representation of file source such as "csv", "json", "parquet"
fn file_type(&self) -> &str;

/// Format FileType specific information
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
Expand Down Expand Up @@ -135,6 +155,19 @@ pub trait FileSource: Send + Sync {
}

/// Try to push down filters into this FileSource.
///
/// `filters` must be in terms of the unprojected table schema (file schema
/// plus partition columns), before any projection is applied.
///
/// Any filters that this FileSource chooses to evaluate itself should be
/// returned as `PushedDown::Yes` in the result, along with a FileSource
/// instance that incorporates those filters. Such filters are logically
/// applied "during" the file scan, meaning they may refer to columns not
/// included in the final output projection.
///
/// Filters that cannot be pushed down should be marked as `PushedDown::No`,
/// and will be evaluated by an execution plan after the file source.
///
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
Expand Down Expand Up @@ -220,7 +253,7 @@ pub trait FileSource: Send + Sync {
Ok(SortOrderPushdownResult::Unsupported)
}

/// Try to push down a projection into a this FileSource.
/// Try to push down a projection into this FileSource.
///
/// `FileSource` implementations that support projection pushdown should
/// override this method and return a new `FileSource` instance with the
Expand Down
59 changes: 46 additions & 13 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ use datafusion_physical_plan::{
use log::{debug, warn};
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};

/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
/// [`FileScanConfig`] represents scanning data from a group of files
///
/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan
/// for scanning files with a particular file format.
///
/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible
/// for creating the actual execution plan to read the files based on a
/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent
/// information about the files **before** any projection or filtering is
Comment on lines +65 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good. I still get confused about stats before vs. after projection / filtering. I do have some caveats: for us we use an external index to prune row groups, so we can actually get more accurate statistics for min/max, num rows, etc. in a file taking into account row groups pruned. It's also useful to project the statistics (or at least only populate them for columns that are in filters or projections); no point in collecting / allocating statistics for columns that will never be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree what the code currently does is not ideal. However, before I fix it I need to understand what it currently does :)

BTW I am actually looking into the same sort of question for the EquivalenceProperties

/// applied in the file source.
///
/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`.
///
/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from
/// a `FileScanConfig`.
///
/// # Example
/// ```
Expand Down Expand Up @@ -169,8 +180,11 @@ pub struct FileScanConfig {
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
/// from the logical schema to the physical schema of the file.
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// Unprojected statistics for the table (file schema + partition columns).
/// These are projected on-demand via `projected_stats()`.
/// Statistics for the entire table (file schema + partition columns).
/// See [`FileScanConfigBuilder::with_statistics`] for more details.
///
/// The effective statistics are computed on-demand via
/// [`ProjectionExprs::project_statistics`].
///
/// Note that this field is pub(crate) because accessing it directly from outside
/// would be incorrect if there are filters being applied, thus this should be accessed
Expand Down Expand Up @@ -283,31 +297,35 @@ impl FileScanConfigBuilder {
}
}

/// Set the maximum number of records to read from this plan. If `None`,
/// all records after filtering are returned.
/// Set the maximum number of records to read from this plan.
///
/// If `None`, all records after filtering are returned.
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Set whether the limit should be order-sensitive.
///
/// When `true`, files must be read in the exact order specified to produce
/// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
/// DataFusion may reorder file processing for optimization without affecting correctness.
/// DataFusion may reorder file processing for optimization without
/// affecting correctness.
pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
self.preserve_order = order_sensitive;
self
}

/// Set the file source for scanning files.
///
/// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
/// after the builder has been created.
/// This method allows you to change the file source implementation (e.g.
/// ParquetSource, CsvSource, etc.) after the builder has been created.
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
self.file_source = file_source;
self
}

/// Return the table schema
pub fn table_schema(&self) -> &SchemaRef {
self.file_source.table_schema().table_schema()
}
Expand All @@ -332,7 +350,12 @@ impl FileScanConfigBuilder {

/// Set the columns on which to project the data using column indices.
///
/// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`.
/// This method attempts to push down the projection to the underlying file
/// source if supported. If the file source does not support projection
/// pushdown, an error is returned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we return Ok(self) -> not an error, an unchanged plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and I think the comment is correct and this code does return an error

FileSource::try_pushdown_projection returns Option, but then this method returns an internal error if the file source returns None

A few lines below in this diff:

        if let Some(new_source) = new_source {
            self.file_source = new_source;
        } else {
            internal_err!(      // <------------ This error 
                "FileSource {} does not support projection pushdown",
                self.file_source.file_type()
            )?;
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah okay. Don't love that haha but I guess we are documenting the current status quo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree -- I struggled with fixing everything I found questionable with just documenting what it was doing

///
/// Indexes that are higher than the number of columns of `file_schema`
/// refer to `table_partition_cols`.
pub fn with_projection_indices(
mut self,
indices: Option<Vec<usize>>,
Expand Down Expand Up @@ -371,8 +394,18 @@ impl FileScanConfigBuilder {
self
}

/// Set the estimated overall statistics of the files, taking `filters` into account.
/// Defaults to [`Statistics::new_unknown`].
/// Set the statistics of the files, including partition
/// columns. Defaults to [`Statistics::new_unknown`].
///
/// These statistics are for the entire table (file schema + partition
/// columns) before any projection or filtering is applied. Projections are
/// applied when statistics are retrieved, and if a filter is present,
/// [`FileScanConfig::statistics`] will mark the statistics as inexact
/// (counts are not adjusted).
///
/// Projections and filters may be applied by the file source, either by
/// [`Self::with_projection_indices`] or a preexisting
/// [`FileSource::projection`] or [`FileSource::filter`].
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = Some(statistics);
self
Expand Down
6 changes: 6 additions & 0 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,13 @@ pub trait DataSource: Send + Sync + Debug {
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>>;

/// Try to push down filters into this DataSource.
///
/// These filters are in terms of the output schema of this DataSource (e.g.
/// [`Self::eq_properties`] and output of any projections pushed into the
/// source), not the original table schema.
///
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}

/// Handle the result of a child pushdown.
///
/// This method is called as we recurse back up the plan tree after pushing
/// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
/// It allows the current node to process the results of filter pushdown from
Expand Down