diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index 351eaff21e9..63f6deb8aa3 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -39,6 +39,7 @@ use std::string::String; use std::sync::Arc; use crate::datasource::TableProvider; +use crate::datasource::datasource::Statistics; use crate::error::{DataFusionError, Result}; use crate::physical_plan::csv::CsvExec; pub use crate::physical_plan::csv::CsvReadOptions; @@ -52,6 +53,7 @@ pub struct CsvFile { has_header: bool, delimiter: u8, file_extension: String, + statistics: Option, } impl CsvFile { @@ -75,6 +77,7 @@ impl CsvFile { has_header: options.has_header, delimiter: options.delimiter, file_extension: String::from(options.file_extension), + statistics: None, }) } } @@ -104,4 +107,8 @@ impl TableProvider for CsvFile { batch_size, )?)) } + + fn statistics(&self) -> Option { + self.statistics.clone() + } } diff --git a/rust/datafusion/src/datasource/datasource.rs b/rust/datafusion/src/datasource/datasource.rs index e7371ea5a8a..70af2d30fa2 100644 --- a/rust/datafusion/src/datasource/datasource.rs +++ b/rust/datafusion/src/datasource/datasource.rs @@ -24,6 +24,15 @@ use crate::arrow::datatypes::SchemaRef; use crate::error::Result; use crate::physical_plan::ExecutionPlan; +/// The table statistics +#[derive(Clone, Debug)] +pub struct Statistics { + /// The number of table rows + pub num_rows: i64, + /// total byte of the table rows + pub total_byte_size: i64, +} + /// Source table pub trait TableProvider { /// Returns the table provider as [`Any`](std::any::Any) so that it can be @@ -39,4 +48,8 @@ pub trait TableProvider { projection: &Option>, batch_size: usize, ) -> Result>; + + /// Returns the table Statistics + /// Statistics should be optional because not all data sources can provide statistics. + fn statistics(&self) -> Option; } diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index 8fa140b02c8..7b51f33079e 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -26,6 +26,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use crate::datasource::TableProvider; +use crate::datasource::datasource::Statistics; use crate::error::{DataFusionError, Result}; use crate::physical_plan::common; use crate::physical_plan::memory::MemoryExec; @@ -35,6 +36,7 @@ use crate::physical_plan::ExecutionPlan; pub struct MemTable { schema: SchemaRef, batches: Vec>, + statistics: Option, } impl MemTable { @@ -48,6 +50,7 @@ impl MemTable { Ok(Self { schema, batches: partitions, + statistics: None, }) } else { Err(DataFusionError::Plan( @@ -132,6 +135,10 @@ impl TableProvider for MemTable { projection.clone(), )?)) } + + fn statistics(&self) -> Option { + self.statistics.clone() + } } #[cfg(test)] diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index be65e638326..1ba99a11848 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use arrow::datatypes::*; use crate::datasource::TableProvider; +use crate::datasource::datasource::Statistics; use crate::error::Result; use crate::physical_plan::parquet::ParquetExec; use crate::physical_plan::ExecutionPlan; @@ -32,6 +33,7 @@ use crate::physical_plan::ExecutionPlan; pub struct ParquetTable { path: String, schema: SchemaRef, + statistics: Option, } impl ParquetTable { @@ -39,9 +41,11 @@ impl ParquetTable { pub fn try_new(path: &str) -> Result { let parquet_exec = ParquetExec::try_new(path, None, 0)?; let schema = parquet_exec.schema(); + let statistics = parquet_exec.statistics(); Ok(Self { path: path.to_string(), schema, + statistics, }) } } @@ -69,6 +73,10 @@ impl TableProvider for ParquetTable { batch_size, )?)) } + + fn statistics(&self) -> Option { + self.statistics.clone() + } } #[cfg(test)] @@ -139,6 +147,16 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_statistics() -> Result<()> { + let table = load_table("alltypes_plain.parquet")?; + let statistics = table.statistics().unwrap(); + assert_eq!(8, statistics.num_rows); + assert_eq!(671, statistics.total_byte_size); + + Ok(()) + } + #[tokio::test] async fn read_bool_alltypes_plain_parquet() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 19d2aa38266..e39c994d456 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -30,7 +30,7 @@ use crate::physical_plan::{common, Partitioning}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use parquet::file::reader::SerializedFileReader; +use parquet::file::reader::{SerializedFileReader, FileReader}; use crossbeam::channel::{bounded, Receiver, RecvError, Sender}; use fmt::Debug; @@ -38,6 +38,7 @@ use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; use async_trait::async_trait; use futures::stream::Stream; +use crate::datasource::datasource::Statistics; /// Execution plan for scanning a Parquet file #[derive(Debug, Clone)] @@ -99,6 +100,38 @@ impl ParquetExec { batch_size, } } + + /// Get the statistics from parquet file format + pub fn statistics(&self) -> Option { + let mut num_rows = 0; + let mut total_byte_size = 0; + for i in 0..self.filenames.len() { + let file = File::open(&self.filenames[i]).ok()?; + let file_reader = Arc::new(SerializedFileReader::new(file).ok()?); + num_rows += file_reader.metadata().file_metadata().num_rows() as i64; + for g in file_reader.metadata().row_groups().iter() { + total_byte_size += g.total_byte_size() as i64; + } + } + + if num_rows >= i64::MAX { + num_rows = i64::MAX; + } + if total_byte_size >= i64::MAX { + total_byte_size = i64::MAX; + } + if num_rows <= 0 { + num_rows = 0; + } + if total_byte_size <= 0 { + total_byte_size = 0; + } + + Option::from(Statistics{ + num_rows, + total_byte_size, + }) + } } #[async_trait] diff --git a/rust/datafusion/tests/dataframe.rs b/rust/datafusion/tests/dataframe.rs index d17deffc5ac..d88c96efe66 100644 --- a/rust/datafusion/tests/dataframe.rs +++ b/rust/datafusion/tests/dataframe.rs @@ -24,6 +24,7 @@ use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::ExecutionContext; +use datafusion::datasource::datasource::Statistics; use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder}; use datafusion::physical_plan::{ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, @@ -145,6 +146,10 @@ impl TableProvider for CustomTableProvider { projection: projection.clone(), })) } + + fn statistics(&self) -> Option { + None + } } #[tokio::test]