diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 432d109b2235c..36fffe5ac4d95 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -22,8 +22,9 @@ use std::fs::File; use std::sync::Arc; use arrow_array::{ - make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array, + make_array, Array, ArrayRef, BooleanArray, Decimal128Array, FixedSizeBinaryArray, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, + StringArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -33,7 +34,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderB use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; -use crate::parquet::Scenario; +use crate::parquet::{struct_array, Scenario}; use super::make_test_file_rg; @@ -73,6 +74,28 @@ pub fn parquet_file_one_column( no_null_values_start: i64, no_null_values_end: i64, row_per_group: usize, +) -> ParquetRecordBatchReaderBuilder { + parquet_file_one_column_stats( + num_null, + no_null_values_start, + no_null_values_end, + row_per_group, + EnabledStatistics::Chunk, + ) +} + +// Create a parquet file with one column for data type i64 +// Data of the file include +// . Number of null rows is the given num_null +// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row +// . The file is divided into row groups of size row_per_group +// . Statistics are enabled/disabled based on the given enable_stats +pub fn parquet_file_one_column_stats( + num_null: usize, + no_null_values_start: i64, + no_null_values_end: i64, + row_per_group: usize, + enable_stats: EnabledStatistics, ) -> ParquetRecordBatchReaderBuilder { let mut output_file = tempfile::Builder::new() .prefix("parquert_statistics_test") @@ -82,7 +105,7 @@ pub fn parquet_file_one_column( let props = WriterProperties::builder() .set_max_row_group_size(row_per_group) - .set_statistics_enabled(EnabledStatistics::Chunk) + .set_statistics_enabled(enable_stats) .build(); let batches = vec![make_int64_batches_with_null( @@ -107,40 +130,56 @@ pub fn parquet_file_one_column( ArrowReaderBuilder::try_new(file).unwrap() } -// Create a parquet file with many columns each has different data type -// - Data types are specified by the given scenario -// - Row group sizes are withe the same or different depending on the provided row_per_group & data created in the scenario -pub async fn parquet_file_many_columns( - scenario: super::Scenario, +/// Defines what data to create in a parquet file +#[derive(Debug, Clone, Copy)] +struct TestReader { + /// What data to create in the parquet file + scenario: Scenario, + /// Number of rows per row group row_per_group: usize, -) -> ParquetRecordBatchReaderBuilder { - let file = make_test_file_rg(scenario, row_per_group).await; +} - // open the file & get the reader - let file = file.reopen().unwrap(); - ArrowReaderBuilder::try_new(file).unwrap() +impl TestReader { + /// Create a parquet file with the specified data, and return a + /// ParquetRecordBatchReaderBuilder opened to that file. + async fn build(self) -> ParquetRecordBatchReaderBuilder { + let TestReader { + scenario, + row_per_group, + } = self; + let file = make_test_file_rg(scenario, row_per_group).await; + + // open the file & get the reader + let file = file.reopen().unwrap(); + ArrowReaderBuilder::try_new(file).unwrap() + } } +/// Defines a test case for statistics extraction struct Test { + /// The parquet file reader reader: ParquetRecordBatchReaderBuilder, expected_min: ArrayRef, expected_max: ArrayRef, expected_null_counts: UInt64Array, expected_row_counts: UInt64Array, + /// Which column to extract statistics from + column_name: &'static str, } impl Test { - fn run(self, col_name: &str) { + fn run(self) { let Self { reader, expected_min, expected_max, expected_null_counts, expected_row_counts, + column_name, } = self; let min = StatisticsConverter::try_new( - col_name, + column_name, RequestedStatistics::Min, reader.schema(), ) @@ -151,7 +190,7 @@ impl Test { assert_eq!(&min, &expected_min, "Mismatch with expected minimums"); let max = StatisticsConverter::try_new( - col_name, + column_name, RequestedStatistics::Max, reader.schema(), ) @@ -161,7 +200,7 @@ impl Test { assert_eq!(&max, &expected_max, "Mismatch with expected maximum"); let null_counts = StatisticsConverter::try_new( - col_name, + column_name, RequestedStatistics::NullCount, reader.schema(), ) @@ -181,17 +220,19 @@ impl Test { ); } - fn run_col_not_found(self, col_name: &str) { + /// Run the test and expect a column not found error + fn run_col_not_found(self) { let Self { reader, expected_min: _, expected_max: _, expected_null_counts: _, expected_row_counts: _, + column_name, } = self; let min = StatisticsConverter::try_new( - col_name, + column_name, RequestedStatistics::Min, reader.schema(), ); @@ -203,9 +244,7 @@ impl Test { // TESTS // // Remaining cases -// - Create parquet files / metadata with missing statistic values -// - Create parquet files / metadata with different data types -- included but not all data types yet -// - Create parquet files / metadata with different row group sizes -- done +// f64::NAN // - Using truncated statistics ("exact min value" and "exact max value" https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact) #[tokio::test] @@ -222,8 +261,9 @@ async fn test_one_row_group_without_null() { expected_null_counts: UInt64Array::from(vec![0]), // 3 rows expected_row_counts: UInt64Array::from(vec![3]), + column_name: "i64", } - .run("i64") + .run() } #[tokio::test] @@ -241,8 +281,9 @@ async fn test_one_row_group_with_null_and_negative() { expected_null_counts: UInt64Array::from(vec![2]), // 8 rows expected_row_counts: UInt64Array::from(vec![8]), + column_name: "i64", } - .run("i64") + .run() } #[tokio::test] @@ -260,8 +301,9 @@ async fn test_two_row_group_with_null() { expected_null_counts: UInt64Array::from(vec![0, 2]), // row counts are [10, 5] expected_row_counts: UInt64Array::from(vec![10, 5]), + column_name: "i64", } - .run("i64") + .run() } #[tokio::test] @@ -279,8 +321,9 @@ async fn test_two_row_groups_with_all_nulls_in_one() { expected_null_counts: UInt64Array::from(vec![1, 3]), // row counts are [5, 3] expected_row_counts: UInt64Array::from(vec![5, 3]), + column_name: "i64", } - .run("i64") + .run() } /////////////// MORE GENERAL TESTS ////////////////////// @@ -291,12 +334,14 @@ async fn test_two_row_groups_with_all_nulls_in_one() { // Four different integer types #[tokio::test] async fn test_int_64() { - let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Int, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, // mins are [-5, -4, 0, 5] expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), // maxes are [-1, 0, 4, 9] @@ -305,18 +350,21 @@ async fn test_int_64() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "i64", } - .run("i64"); + .run(); } #[tokio::test] async fn test_int_32() { - let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Int, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, // mins are [-5, -4, 0, 5] expected_min: Arc::new(Int32Array::from(vec![-5, -4, 0, 5])), // maxes are [-1, 0, 4, 9] @@ -325,8 +373,9 @@ async fn test_int_32() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "i32", } - .run("i32"); + .run(); } // BUG: ignore this test for now @@ -337,12 +386,14 @@ async fn test_int_32() { #[ignore] #[tokio::test] async fn test_int_16() { - let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Int, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, // mins are [-5, -4, 0, 5] // BUG: not sure why this returns same data but in Int32Array type even though I debugged and the columns name is "i16" an its data is Int16 // My debugging tells me the bug is either at: @@ -361,8 +412,9 @@ async fn test_int_16() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "i16", } - .run("i16"); + .run(); } // BUG (same as above): ignore this test for now @@ -370,12 +422,14 @@ async fn test_int_16() { #[ignore] #[tokio::test] async fn test_int_8() { - let row_per_group = 5; // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" - let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Int, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, // mins are [-5, -4, 0, 5] // BUG: not sure why this returns same data but in Int32Array even though I debugged and the columns name is "i8" an its data is Int8 expected_min: Arc::new(Int8Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array @@ -385,15 +439,14 @@ async fn test_int_8() { expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "i8", } - .run("i8"); + .run(); } // timestamp #[tokio::test] async fn test_timestamp() { - let row_per_group = 5; - // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" // "nanos" --> TimestampNanosecondArray // "micros" --> TimestampMicrosecondArray @@ -403,10 +456,14 @@ async fn test_timestamp() { // // The file is created by 4 record batches, each has 5 rowws. // Since the row group isze is set to 5, those 4 batches will go into 4 row groups - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = TestReader { + scenario: Scenario::Timestamps, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, // mins are [1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,] expected_min: Arc::new(Int64Array::from(vec![ 1577840461000000000, @@ -425,13 +482,14 @@ async fn test_timestamp() { expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "nanos", } - .run("nanos"); + .run(); // micros - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![ 1577840461000000, 1577840471000000, @@ -446,13 +504,13 @@ async fn test_timestamp() { ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "micros", } - .run("micros"); + .run(); // millis - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![ 1577840461000, 1577840471000, @@ -467,13 +525,13 @@ async fn test_timestamp() { ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "millis", } - .run("millis"); + .run(); // seconds - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![ 1577840461, 1577840471, 1577841061, 1578704461, ])), @@ -482,15 +540,14 @@ async fn test_timestamp() { ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "seconds", } - .run("seconds"); + .run(); } // timestamp with different row group sizes #[tokio::test] async fn test_timestamp_diff_rg_sizes() { - let row_per_group = 8; - // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" // "nanos" --> TimestampNanosecondArray // "micros" --> TimestampMicrosecondArray @@ -499,10 +556,13 @@ async fn test_timestamp_diff_rg_sizes() { // "names" --> StringArray // // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 3 row groups with size 8, 8, 4 - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Timestamps, + row_per_group: 8, // note that the row group size is 8 + }; Test { - reader, + reader: reader.build().await, // mins are [1577840461000000000, 1577841061000000000, 1578704521000000000] expected_min: Arc::new(Int64Array::from(vec![ 1577840461000000000, @@ -519,13 +579,13 @@ async fn test_timestamp_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "nanos", } - .run("nanos"); + .run(); // micros - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![ 1577840461000000, 1577841061000000, @@ -538,13 +598,13 @@ async fn test_timestamp_diff_rg_sizes() { ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "micros", } - .run("micros"); + .run(); // millis - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![ 1577840461000, 1577841061000, @@ -557,13 +617,13 @@ async fn test_timestamp_diff_rg_sizes() { ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "millis", } - .run("millis"); + .run(); // seconds - let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![ 1577840461, 1577841061, 1578704521, ])), @@ -572,8 +632,9 @@ async fn test_timestamp_diff_rg_sizes() { ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + column_name: "seconds", } - .run("seconds"); + .run(); } // date with different row group sizes @@ -581,18 +642,18 @@ async fn test_timestamp_diff_rg_sizes() { // https://github.com/apache/datafusion/issues/10587 #[tokio::test] async fn test_dates_32_diff_rg_sizes() { - let row_per_group = 13; - // This creates a parquet files of 3 columns named "date32", "date64", "names" // "date32" --> Date32Array // "date64" --> Date64Array // "names" --> StringArray // // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 - let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; - + let reader = TestReader { + scenario: Scenario::Dates, + row_per_group: 13, + }; Test { - reader, + reader: reader.build().await, // mins are [18262, 18565,] expected_min: Arc::new(Int32Array::from(vec![18262, 18565])), // maxes are [18564, 21865,] @@ -601,8 +662,9 @@ async fn test_dates_32_diff_rg_sizes() { expected_null_counts: UInt64Array::from(vec![2, 2]), // row counts are [13, 7] expected_row_counts: UInt64Array::from(vec![13, 7]), + column_name: "date32", } - .run("date32"); + .run(); } // BUG: same as above. Expect to return Date64Array but returns Int32Array @@ -611,25 +673,26 @@ async fn test_dates_32_diff_rg_sizes() { #[ignore] #[tokio::test] async fn test_dates_64_diff_rg_sizes() { - let row_per_group = 13; // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 - let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Dates, + row_per_group: 13, + }; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), // panic here because the actual data is Int32Array expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), + column_name: "date64", } - .run("date64"); + .run(); } // BUG: // https://github.com/apache/datafusion/issues/10604 #[tokio::test] async fn test_uint() { - let row_per_group = 4; - // This creates a parquet files of 4 columns named "u8", "u16", "u32", "u64" // "u8" --> UInt8Array // "u16" --> UInt16Array @@ -637,118 +700,133 @@ async fn test_uint() { // "u64" --> UInt64Array // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 5 row groups with size 4 - let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + let reader = TestReader { + scenario: Scenario::UInt, + row_per_group: 4, + }; // u8 // BUG: expect UInt8Array but returns Int32Array Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt8Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + column_name: "u8", } - .run("u8"); + .run(); // u16 // BUG: expect UInt16Array but returns Int32Array - let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt16Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + column_name: "u16", } - .run("u16"); + .run(); // u32 // BUG: expect UInt32Array but returns Int32Array - let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + column_name: "u32", } - .run("u32"); + .run(); // u64 // BUG: expect UInt64rray but returns Int64Array - let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt64Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + column_name: "u64", } - .run("u64"); + .run(); } #[tokio::test] async fn test_int32_range() { - let row_per_group = 5; // This creates a parquet file of 1 column "i" // file has 2 record batches, each has 2 rows. They will be saved into one row group - let reader = parquet_file_many_columns(Scenario::Int32Range, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Int32Range, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int32Array::from(vec![0])), expected_max: Arc::new(Int32Array::from(vec![300000])), expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), + column_name: "i", } - .run("i"); + .run(); } // BUG: not convert UInt32Array to Int32Array // https://github.com/apache/datafusion/issues/10604 #[tokio::test] async fn test_uint32_range() { - let row_per_group = 5; // This creates a parquet file of 1 column "u" // file has 2 record batches, each has 2 rows. They will be saved into one row group - let reader = parquet_file_many_columns(Scenario::UInt32Range, row_per_group).await; + let reader = TestReader { + scenario: Scenario::UInt32Range, + row_per_group: 5, + }; Test { - reader, - expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be UInt32Array - expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be UInt32Array + reader: reader.build().await, + expected_min: Arc::new(Int32Array::from(vec![0])), // should be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![300000])), // should be UInt32Array expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), + column_name: "u", } - .run("u"); + .run(); } #[tokio::test] async fn test_float64() { - let row_per_group = 5; // This creates a parquet file of 1 column "f" // file has 4 record batches, each has 5 rows. They will be saved into 4 row groups - let reader = parquet_file_many_columns(Scenario::Float64, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Float64, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0, 5.0])), expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + column_name: "f", } - .run("f"); + .run(); } #[tokio::test] async fn test_decimal() { - let row_per_group = 5; // This creates a parquet file of 1 column "decimal_col" with decimal data type and precicion 9, scale 2 // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups - let reader = parquet_file_many_columns(Scenario::Decimal, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Decimal, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, expected_min: Arc::new( Decimal128Array::from(vec![100, -500, 2000]) .with_precision_and_scale(9, 2) @@ -761,16 +839,15 @@ async fn test_decimal() { ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "decimal_col", } - .run("decimal_col"); + .run(); } // BUG: not convert BinaryArray to StringArray // https://github.com/apache/datafusion/issues/10605 #[tokio::test] async fn test_byte() { - let row_per_group = 5; - // This creates a parquet file of 4 columns // "name" // "service_string" @@ -778,11 +855,14 @@ async fn test_byte() { // "service_fixedsize" // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups - let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + let reader = TestReader { + scenario: Scenario::ByteArray, + row_per_group: 5, + }; // column "name" Test { - reader, + reader: reader.build().await, expected_min: Arc::new(StringArray::from(vec![ "all frontends", "mixed", @@ -795,13 +875,13 @@ async fn test_byte() { ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "name", } - .run("name"); + .run(); // column "service_string" - let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(StringArray::from(vec![ "frontend five", "backend one", @@ -814,13 +894,13 @@ async fn test_byte() { ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "service_string", } - .run("service_string"); + .run(); // column "service_binary" - let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(StringArray::from(vec![ "frontend five", "backend one", @@ -833,17 +913,18 @@ async fn test_byte() { ])), // Shuld be BinaryArray expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "service_binary", } - .run("service_binary"); + .run(); // column "service_fixedsize" // b"fe1", b"be1", b"be4" let min_input = vec![vec![102, 101, 49], vec![98, 101, 49], vec![98, 101, 52]]; // b"fe5", b"fe6", b"be8" let max_input = vec![vec![102, 101, 55], vec![102, 101, 54], vec![98, 101, 56]]; - let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { - reader, + reader: reader.build().await, expected_min: Arc::new( FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap(), ), @@ -852,22 +933,24 @@ async fn test_byte() { ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "service_fixedsize", } - .run("service_fixedsize"); + .run(); } // PeriodsInColumnNames #[tokio::test] async fn test_period_in_column_names() { - let row_per_group = 5; // This creates a parquet file of 2 columns "name" and "service.name" // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups - let reader = - parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + let reader = TestReader { + scenario: Scenario::PeriodsInColumnNames, + row_per_group: 5, + }; // column "name" Test { - reader, + reader: reader.build().await, expected_min: Arc::new(StringArray::from(vec![ "HTTP GET / DISPATCH", "HTTP PUT / DISPATCH", @@ -880,39 +963,102 @@ async fn test_period_in_column_names() { ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "name", } - .run("name"); + .run(); // column "service.name" - let reader = - parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(StringArray::from(vec!["frontend", "backend", "backend"])), expected_max: Arc::new(StringArray::from(vec![ "frontend", "frontend", "backend", ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + column_name: "service.name", } - .run("service.name"); + .run(); } -// TODO: -// WITHOUT Stats +// Boolean +#[tokio::test] +async fn test_boolean() { + // This creates a parquet files of 1 column named "bool" + // The file is created by 2 record batches each has 5 rows --> 2 row groups + let reader = TestReader { + scenario: Scenario::Boolean, + row_per_group: 5, + }; + + Test { + reader: reader.build().await, + expected_min: Arc::new(BooleanArray::from(vec![false, false])), + expected_max: Arc::new(BooleanArray::from(vec![true, false])), + expected_null_counts: UInt64Array::from(vec![1, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5]), + column_name: "bool", + } + .run(); +} + +// struct array +// BUG +// https://github.com/apache/datafusion/issues/10609 +// Note that: since I have not worked on struct array before, there may be a bug in the test code rather than the real bug in the code +#[ignore] +#[tokio::test] +async fn test_struct() { + // This creates a parquet files of 1 column named "struct" + // The file is created by 1 record batch with 3 rows in the struct array + let reader = TestReader { + scenario: Scenario::StructArray, + row_per_group: 5, + }; + Test { + reader: reader.build().await, + expected_min: Arc::new(struct_array(vec![(Some(1), Some(6.0), Some(12.0))])), + expected_max: Arc::new(struct_array(vec![(Some(2), Some(8.5), Some(14.0))])), + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: UInt64Array::from(vec![3]), + column_name: "struct", + } + .run(); +} +////// Files with missing statistics /////// + +#[tokio::test] +async fn test_missing_statistics() { + let row_per_group = 5; + let reader = + parquet_file_one_column_stats(0, 4, 7, row_per_group, EnabledStatistics::None); + + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![None])), + expected_max: Arc::new(Int64Array::from(vec![None])), + expected_null_counts: UInt64Array::from(vec![None]), + expected_row_counts: UInt64Array::from(vec![3]), // stil has row count statistics + column_name: "i64", + } + .run(); +} /////// NEGATIVE TESTS /////// // column not found #[tokio::test] async fn test_column_not_found() { - let row_per_group = 5; - let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; + let reader = TestReader { + scenario: Scenario::Dates, + row_per_group: 5, + }; Test { - reader, + reader: reader.build().await, expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), expected_null_counts: UInt64Array::from(vec![2, 2]), expected_row_counts: UInt64Array::from(vec![13, 7]), + column_name: "not_a_column", } - .run_col_not_found("not_a_column"); + .run_col_not_found(); } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index bdc39c269d299..c5d0ad60bc10f 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::make_array; +use arrow_array::{make_array, BooleanArray, Float32Array, StructArray}; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -64,7 +64,9 @@ fn init() { // ---------------------- /// What data to use +#[derive(Debug, Clone, Copy)] enum Scenario { + Boolean, Timestamps, Dates, Int, @@ -81,6 +83,7 @@ enum Scenario { PeriodsInColumnNames, WithNullValues, WithNullValuesPageLevel, + StructArray, } enum Unit { @@ -312,6 +315,16 @@ impl ContextWithParquet { } } +fn make_boolean_batch(v: Vec>) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "bool", + DataType::Boolean, + true, + )])); + let array = Arc::new(BooleanArray::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + /// Return record batch with a few rows of data for all of the supported timestamp types /// values with the specified offset /// @@ -699,6 +712,24 @@ fn make_int_batches_with_null( fn create_data_batch(scenario: Scenario) -> Vec { match scenario { + Scenario::Boolean => { + vec![ + make_boolean_batch(vec![ + Some(true), + Some(false), + Some(true), + Some(false), + None, + ]), + make_boolean_batch(vec![ + Some(false), + Some(false), + Some(false), + Some(false), + Some(false), + ]), + ] + } Scenario::Timestamps => { vec![ make_timestamp_batch(TimeDelta::try_seconds(0).unwrap()), @@ -881,6 +912,20 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_int_batches_with_null(5, 1, 6), ] } + Scenario::StructArray => { + let struct_array_data = struct_array(vec![ + (Some(1), Some(6.0), Some(12.0)), + (Some(2), Some(8.5), None), + (None, Some(8.5), Some(14.0)), + ]); + + let schema = Arc::new(Schema::new(vec![Field::new( + "struct", + struct_array_data.data_type().clone(), + true, + )])); + vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] + } } } @@ -936,3 +981,27 @@ async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTe writer.close().unwrap(); output_file } + +// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values +fn struct_array(input: Vec<(Option, Option, Option)>) -> ArrayRef { + let int_32: Int32Array = input.iter().map(|(i, _, _)| i).collect(); + let float_32: Float32Array = input.iter().map(|(_, f, _)| f).collect(); + let float_64: Float64Array = input.iter().map(|(_, _, f)| f).collect(); + + let nullable = true; + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("int32_col", DataType::Int32, nullable)), + Arc::new(int_32) as ArrayRef, + ), + ( + Arc::new(Field::new("float32_col", DataType::Float32, nullable)), + Arc::new(float_32) as ArrayRef, + ), + ( + Arc::new(Field::new("float64_col", DataType::Float64, nullable)), + Arc::new(float_64) as ArrayRef, + ), + ]); + Arc::new(struct_array) +}