Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rust/datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::string::String;
use std::sync::Arc;

use crate::datasource::TableProvider;
use crate::datasource::datasource::Statistics;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
Expand All @@ -52,6 +53,7 @@ pub struct CsvFile {
has_header: bool,
delimiter: u8,
file_extension: String,
statistics: Option<Statistics>,
}

impl CsvFile {
Expand All @@ -75,6 +77,7 @@ impl CsvFile {
has_header: options.has_header,
delimiter: options.delimiter,
file_extension: String::from(options.file_extension),
statistics: None,
})
}
}
Expand Down Expand Up @@ -104,4 +107,8 @@ impl TableProvider for CsvFile {
batch_size,
)?))
}

fn statistics(&self) -> Option<Statistics> {
self.statistics.clone()
}
}
13 changes: 13 additions & 0 deletions rust/datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::physical_plan::ExecutionPlan;

/// The table statistics
#[derive(Clone, Debug)]
pub struct Statistics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is repeating what you have done in #8831

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Mike, this is just a draft code for WIP, when #8831 have been merged I'll recommit it.

/// The number of table rows
pub num_rows: i64,
/// total byte of the table rows
pub total_byte_size: i64,
}

/// Source table
pub trait TableProvider {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
Expand All @@ -39,4 +48,8 @@ pub trait TableProvider {
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Returns the table Statistics
/// Statistics should be optional because not all data sources can provide statistics.
fn statistics(&self) -> Option<Statistics>;
}
7 changes: 7 additions & 0 deletions rust/datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use crate::datasource::TableProvider;
use crate::datasource::datasource::Statistics;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common;
use crate::physical_plan::memory::MemoryExec;
Expand All @@ -35,6 +36,7 @@ use crate::physical_plan::ExecutionPlan;
pub struct MemTable {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
statistics: Option<Statistics>,
}

impl MemTable {
Expand All @@ -48,6 +50,7 @@ impl MemTable {
Ok(Self {
schema,
batches: partitions,
statistics: None,
})
} else {
Err(DataFusionError::Plan(
Expand Down Expand Up @@ -132,6 +135,10 @@ impl TableProvider for MemTable {
projection.clone(),
)?))
}

fn statistics(&self) -> Option<Statistics> {
self.statistics.clone()
}
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use arrow::datatypes::*;

use crate::datasource::TableProvider;
use crate::datasource::datasource::Statistics;
use crate::error::Result;
use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::ExecutionPlan;
Expand All @@ -32,16 +33,19 @@ use crate::physical_plan::ExecutionPlan;
pub struct ParquetTable {
path: String,
schema: SchemaRef,
statistics: Option<Statistics>,
}

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: &str) -> Result<Self> {
let parquet_exec = ParquetExec::try_new(path, None, 0)?;
let schema = parquet_exec.schema();
let statistics = parquet_exec.statistics();
Ok(Self {
path: path.to_string(),
schema,
statistics,
})
}
}
Expand Down Expand Up @@ -69,6 +73,10 @@ impl TableProvider for ParquetTable {
batch_size,
)?))
}

fn statistics(&self) -> Option<Statistics> {
self.statistics.clone()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -139,6 +147,16 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_statistics() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let statistics = table.statistics().unwrap();
assert_eq!(8, statistics.num_rows);
assert_eq!(671, statistics.total_byte_size);

Ok(())
}

#[tokio::test]
async fn read_bool_alltypes_plain_parquet() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
Expand Down
35 changes: 34 additions & 1 deletion rust/datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use crate::physical_plan::{common, Partitioning};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use parquet::file::reader::SerializedFileReader;
use parquet::file::reader::{SerializedFileReader, FileReader};

use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};

use async_trait::async_trait;
use futures::stream::Stream;
use crate::datasource::datasource::Statistics;

/// Execution plan for scanning a Parquet file
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -99,6 +100,38 @@ impl ParquetExec {
batch_size,
}
}

/// Get the statistics from parquet file format
pub fn statistics(&self) -> Option<Statistics> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By doing this you are parsing the parquet file headers a second time when they have already been parsed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, for clarity, the ParquetReader already performs this code. You could expose the metadata produced in the first pass and then you don't need to reimplement this logic (which has to read all the files again).

let mut num_rows = 0;
let mut total_byte_size = 0;
for i in 0..self.filenames.len() {
let file = File::open(&self.filenames[i]).ok()?;
let file_reader = Arc::new(SerializedFileReader::new(file).ok()?);
num_rows += file_reader.metadata().file_metadata().num_rows() as i64;
for g in file_reader.metadata().row_groups().iter() {
total_byte_size += g.total_byte_size() as i64;
}
}

if num_rows >= i64::MAX {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could use std::cmp::min and std::cmp::max instead of these if statements to find min/max values

num_rows = i64::MAX;
}
if total_byte_size >= i64::MAX {
total_byte_size = i64::MAX;
}
if num_rows <= 0 {
num_rows = 0;
}
if total_byte_size <= 0 {
total_byte_size = 0;
}

Option::from(Statistics{
num_rows,
total_byte_size,
})
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions rust/datafusion/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};

use datafusion::execution::context::ExecutionContext;
use datafusion::datasource::datasource::Statistics;
use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder};
use datafusion::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Expand Down Expand Up @@ -145,6 +146,10 @@ impl TableProvider for CustomTableProvider {
projection: projection.clone(),
}))
}

fn statistics(&self) -> Option<Statistics> {
None
}
}

#[tokio::test]
Expand Down