Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rust/datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct CsvFile {
has_header: bool,
delimiter: u8,
file_extension: String,
statistics: Option<Statistics>,
statistics: Statistics,
}

impl CsvFile {
Expand All @@ -77,7 +77,7 @@ impl CsvFile {
has_header: options.has_header,
delimiter: options.delimiter,
file_extension: String::from(options.file_extension),
statistics: None,
statistics: Statistics::default(),
})
}
}
Expand Down Expand Up @@ -108,7 +108,7 @@ impl TableProvider for CsvFile {
)?))
}

fn statistics(&self) -> Option<Statistics> {
fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
8 changes: 4 additions & 4 deletions rust/datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use crate::physical_plan::ExecutionPlan;

/// This table statistics are estimates.
/// It can not be used directly in the precise compute
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Statistics {
/// The number of table rows
pub num_rows: usize,
pub num_rows: Option<usize>,
/// total byte of the table rows
pub total_byte_size: usize,
pub total_byte_size: Option<usize>,
}

/// Source table
Expand All @@ -52,5 +52,5 @@ pub trait TableProvider {

/// Returns the table Statistics
/// Statistics should be optional because not all data sources can provide statistics.
fn statistics(&self) -> Option<Statistics>;
fn statistics(&self) -> Statistics;
}
23 changes: 16 additions & 7 deletions rust/datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,28 @@ use crate::physical_plan::ExecutionPlan;
pub struct MemTable {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
statistics: Option<Statistics>,
statistics: Statistics,
}

impl MemTable {
/// Create a new in-memory table from the provided schema and record batches
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
if partitions.iter().all(|partition| {
partition
if partitions
.iter()
.flatten()
.all(|batches| batches.schema() == schema)
{
let num_rows: usize = partitions
.iter()
.all(|batches| batches.schema().as_ref() == schema.as_ref())
}) {
.flat_map(|batches| batches.iter().map(RecordBatch::num_rows))
.sum();
Ok(Self {
schema,
batches: partitions,
statistics: None,
statistics: Statistics {
num_rows: Some(num_rows),
total_byte_size: None,
},
})
} else {
Err(DataFusionError::Plan(
Expand Down Expand Up @@ -136,7 +143,7 @@ impl TableProvider for MemTable {
)?))
}

fn statistics(&self) -> Option<Statistics> {
fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
Expand Down Expand Up @@ -167,6 +174,8 @@ mod tests {

let provider = MemTable::try_new(schema, vec![vec![batch]])?;

assert_eq!(provider.statistics().num_rows, Some(3));

// scan with projection
let exec = provider.scan(&Some(vec![2, 1]), 1024)?;
let mut it = exec.execute(0).await?;
Expand Down
6 changes: 3 additions & 3 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::physical_plan::ExecutionPlan;
pub struct ParquetTable {
path: String,
schema: SchemaRef,
statistics: Option<Statistics>,
statistics: Statistics,
}

impl ParquetTable {
Expand All @@ -44,7 +44,7 @@ impl ParquetTable {
Ok(Self {
path: path.to_string(),
schema,
statistics: None,
statistics: Statistics::default(),
})
}
}
Expand Down Expand Up @@ -73,7 +73,7 @@ impl TableProvider for ParquetTable {
)?))
}

fn statistics(&self) -> Option<Statistics> {
fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ impl TableProvider for CustomTableProvider {
}))
}

fn statistics(&self) -> Option<Statistics> {
None
fn statistics(&self) -> Statistics {
Statistics::default()
}
}

Expand Down