From 8b74a787cb26338a141f1ce366e0f401d42098b2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 6 Feb 2026 08:50:49 -0500 Subject: [PATCH 1/2] Better document FileScanConfig::Statistics --- datafusion/datasource/src/file.rs | 30 +++++++++- datafusion/datasource/src/file_scan_config.rs | 59 +++++++++++++++---- datafusion/datasource/src/source.rs | 6 ++ 3 files changed, 79 insertions(+), 16 deletions(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index c6282c3c7c14e..212d2705236bb 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -46,6 +46,12 @@ pub fn as_file_source(source: T) -> Arc /// file format specific behaviors for elements in [`DataSource`] /// +/// # Schema information +/// There are two important schemas for a [`FileSource`]: +/// 1. [`Self::table_schema`] -- the schema for the overall "table" +/// 2. The logical output schema, comprised of [`Self::table_schema`] with +/// [`Self::projection`] applied +/// /// See more details on specific implementations: /// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html) /// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html) @@ -64,24 +70,38 @@ pub trait FileSource: Send + Sync { ) -> Result>; /// Any fn as_any(&self) -> &dyn Any; + /// Returns the table schema for this file source. /// - /// This always returns the unprojected schema (the full schema of the data). + /// This always returns the unprojected schema (the full schema of the data) + /// without [`Self::projection`] applied. + /// + /// The output schema of this `FileSource` is this TableSchema + /// with [`Self::projection`] applied. fn table_schema(&self) -> &crate::table_schema::TableSchema; + /// Initialize new type with batch size configuration fn with_batch_size(&self, batch_size: usize) -> Arc; - /// Returns the filter expression that will be applied during the file scan. + + /// Returns the filter expression that will be applied *during* the file scan. + /// + /// These expressions are in terms of the unprojected [`Self::table_schema`]. fn filter(&self) -> Option> { None } - /// Return the projection that will be applied to the output stream on top of the table schema. + + /// Return the projection that will be applied to the output stream on top + /// of [`Self::table_schema`]. fn projection(&self) -> Option<&ProjectionExprs> { None } + /// Return execution plan metrics fn metrics(&self) -> &ExecutionPlanMetricsSet; + /// String representation of file source such as "csv", "json", "parquet" fn file_type(&self) -> &str; + /// Format FileType specific information fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { Ok(()) @@ -135,6 +155,10 @@ pub trait FileSource: Send + Sync { } /// Try to push down filters into this FileSource. + /// + /// `filters` must be in terms of the unprojected table schema (file schema + /// plus partition columns). + /// /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fe78c0e5262a4..80722755e6ddc 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -55,10 +55,21 @@ use datafusion_physical_plan::{ use log::{debug, warn}; use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; -/// The base configurations for a [`DataSourceExec`], the a physical plan for -/// any given file format. +/// [`FileScanConfig`] represents scanning data from a group of files /// -/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`. +/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan +/// for scanning files with a particular file format. +/// +/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible +/// for creating the actual execution plan to read the files based on a +/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent +/// information about the files **before** any projection or filtering is +/// applied in the file source. +/// +/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`. +/// +/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from +/// a `FileScanConfig`. /// /// # Example /// ``` @@ -169,8 +180,11 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, - /// Unprojected statistics for the table (file schema + partition columns). - /// These are projected on-demand via `projected_stats()`. + /// Statistics for the entire table (file schema + partition columns). + /// See [`FileScanConfigBuilder::with_statistics`] for more details. + /// + /// The effective statistics are computed on-demand via + /// [`ProjectionExprs::project_statistics`]. /// /// Note that this field is pub(crate) because accessing it directly from outside /// would be incorrect if there are filters being applied, thus this should be accessed @@ -283,17 +297,20 @@ impl FileScanConfigBuilder { } } - /// Set the maximum number of records to read from this plan. If `None`, - /// all records after filtering are returned. + /// Set the maximum number of records to read from this plan. + /// + /// If `None`, all records after filtering are returned. pub fn with_limit(mut self, limit: Option) -> Self { self.limit = limit; self } /// Set whether the limit should be order-sensitive. + /// /// When `true`, files must be read in the exact order specified to produce /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`, - /// DataFusion may reorder file processing for optimization without affecting correctness. + /// DataFusion may reorder file processing for optimization without + /// affecting correctness. pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self { self.preserve_order = order_sensitive; self @@ -301,13 +318,14 @@ impl FileScanConfigBuilder { /// Set the file source for scanning files. /// - /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) - /// after the builder has been created. + /// This method allows you to change the file source implementation (e.g. + /// ParquetSource, CsvSource, etc.) after the builder has been created. pub fn with_source(mut self, file_source: Arc) -> Self { self.file_source = file_source; self } + /// Return the table schema pub fn table_schema(&self) -> &SchemaRef { self.file_source.table_schema().table_schema() } @@ -332,7 +350,12 @@ impl FileScanConfigBuilder { /// Set the columns on which to project the data using column indices. /// - /// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`. + /// This method attempts to push down the projection to the underlying file + /// source if supported. If the file source does not support projection + /// pushdown, an error is returned. + /// + /// Indexes that are higher than the number of columns of `file_schema` + /// refer to `table_partition_cols`. pub fn with_projection_indices( mut self, indices: Option>, @@ -371,8 +394,18 @@ impl FileScanConfigBuilder { self } - /// Set the estimated overall statistics of the files, taking `filters` into account. - /// Defaults to [`Statistics::new_unknown`]. + /// Set the statistics of the files, including partition + /// columns. Defaults to [`Statistics::new_unknown`]. + /// + /// These statistics are for the entire table (file schema + partition + /// columns) before any projection or filtering is applied. Projections are + /// applied when statistics are retrieved, and if a filter is present, + /// [`FileScanConfig::statistics`] will mark the statistics as inexact + /// (counts are not adjusted). + /// + /// Projections and filters may be applied by the file source, either by + /// [`Self::with_projection_indices`] or a preexisting + /// [`FileSource::projection`] or [`FileSource::filter`]. pub fn with_statistics(mut self, statistics: Statistics) -> Self { self.statistics = Some(statistics); self diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index de18b6be2235f..71ddac84a8f08 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -178,7 +178,13 @@ pub trait DataSource: Send + Sync + Debug { &self, _projection: &ProjectionExprs, ) -> Result>>; + /// Try to push down filters into this DataSource. + /// + /// These filters are in terms of the output schema of this DataSource (e.g. + /// [`Self::eq_properties`] and output of any projections pushed into the + /// source), not the original table schema. + /// /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result From 02e7617148ea29eeb79126f4a09e074c102b2caa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 6 Feb 2026 12:09:56 -0500 Subject: [PATCH 2/2] Add more clarification --- datafusion/datasource/src/file.rs | 13 +++++++++++-- datafusion/physical-plan/src/execution_plan.rs | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 212d2705236bb..b41a456f1f8c2 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -157,7 +157,16 @@ pub trait FileSource: Send + Sync { /// Try to push down filters into this FileSource. /// /// `filters` must be in terms of the unprojected table schema (file schema - /// plus partition columns). + /// plus partition columns), before any projection is applied. + /// + /// Any filters that this FileSource chooses to evaluate itself should be + /// returned as `PushedDown::Yes` in the result, along with a FileSource + /// instance that incorporates those filters. Such filters are logically + /// applied "during" the file scan, meaning they may refer to columns not + /// included in the final output projection. + /// + /// Filters that cannot be pushed down should be marked as `PushedDown::No`, + /// and will be evaluated by an execution plan after the file source. /// /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// @@ -244,7 +253,7 @@ pub trait FileSource: Send + Sync { Ok(SortOrderPushdownResult::Unsupported) } - /// Try to push down a projection into a this FileSource. + /// Try to push down a projection into this FileSource. /// /// `FileSource` implementations that support projection pushdown should /// override this method and return a new `FileSource` instance with the diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 52f4829127651..43cce0e5ea421 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -577,6 +577,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } /// Handle the result of a child pushdown. + /// /// This method is called as we recurse back up the plan tree after pushing /// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`]. /// It allows the current node to process the results of filter pushdown from