diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 40b4ec61dc102..ae9957343b267 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -21,11 +21,11 @@ use std::any::Any; use std::sync::Arc; use crate::memory::MemoryStream; -use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; use crate::{ execution_plan::{Boundedness, EmissionType}, DisplayFormatType, ExecutionPlan, Partitioning, }; +use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -34,6 +34,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; use crate::execution_plan::SchedulingType; +use datafusion_common::stats::Precision; use log::trace; /// Execution plan for empty relation with produce_one_row=false @@ -97,8 +98,11 @@ impl DisplayAs for EmptyExec { write!(f, "EmptyExec") } DisplayFormatType::TreeRender => { - // TODO: collect info - write!(f, "") + write!( + f, + "EmptyExec: schema={}, partitions={}", + self.schema, self.partitions + ) } } } @@ -165,23 +169,28 @@ impl ExecutionPlan for EmptyExec { ); } } + // Build explicit stats: exact zero rows and bytes, with unknown columns + let mut stats = Statistics::default() + .with_num_rows(Precision::Exact(0)) + .with_total_byte_size(Precision::Exact(0)); + + // Add unknown column stats for each field in schema + for _field in self.schema.fields() { + stats = stats.add_column_statistics( + datafusion_common::stats::ColumnStatistics::new_unknown(), + ); + } - let batch = self - .data() - .expect("Create empty RecordBatch should not fail"); - Ok(common::compute_record_batch_statistics( - &[batch], - &self.schema, - None, - )) + Ok(stats) } } #[cfg(test)] mod tests { use super::*; - use crate::test; use crate::with_new_children_if_necessary; + use crate::{common, test}; + use arrow_schema::Schema; #[tokio::test] async fn empty() -> Result<()> { @@ -229,4 +238,41 @@ mod tests { assert!(empty.execute(20, task_ctx).is_err()); Ok(()) } + + #[test] + fn empty_partition_statistics_explicit_zero() -> Result<()> { + let schema: Arc = Arc::new(Schema::empty()); + let exec1: EmptyExec = EmptyExec::new(Arc::clone(&schema)); + // default partition = 1 + + // global stats + let stats_all_1: Statistics = exec1.partition_statistics(None)?; + assert_eq!(stats_all_1.num_rows, Precision::Exact(0)); + assert_eq!(stats_all_1.total_byte_size, Precision::Exact(0)); + assert_eq!(stats_all_1.column_statistics.len(), schema.fields().len()); + + // partition 0 + let stats0_1: Statistics = exec1.partition_statistics(Some(0))?; + assert_eq!(stats0_1.num_rows, Precision::Exact(0)); + assert_eq!(stats0_1.total_byte_size, Precision::Exact(0)); + assert_eq!(stats0_1.column_statistics.len(), schema.fields().len()); + + // invalid partition for default + assert!(exec1.partition_statistics(Some(1)).is_err()); + + // Now with 2 partitions + let exec2: EmptyExec = EmptyExec::new(Arc::clone(&schema)).with_partitions(2); + + // valid partitions 0 and 1 + for part in 0..2 { + let stats = exec2.partition_statistics(Some(part))?; + assert_eq!(stats.num_rows, Precision::Exact(0)); + assert_eq!(stats.total_byte_size, Precision::Exact(0)); + assert_eq!(stats.column_statistics.len(), schema.fields().len()); + } + + // invalid partition 2 + assert!(exec2.partition_statistics(Some(2)).is_err()); + Ok(()) + } }