Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 109 additions & 3 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
use arrow::array::Decimal128Array;
use arrow::{
array::{
Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray,
Float64Array, Int32Array, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -70,6 +70,7 @@ enum Scenario {
DecimalBloomFilterInt64,
DecimalLargePrecision,
DecimalLargePrecisionBloomFilter,
ByteArray,
PeriodsInColumnNames,
}

Expand Down Expand Up @@ -506,6 +507,51 @@ fn make_date_batch(offset: Duration) -> RecordBatch {
.unwrap()
}

/// returns a batch with two columns (note "service.name" is the name
/// of the column. It is *not* a table named service.name
///
/// name | service.name
fn make_bytearray_batch(
name: &str,
string_values: Vec<&str>,
binary_values: Vec<&[u8]>,
fixedsize_values: Vec<&[u8; 3]>,
) -> RecordBatch {
let num_rows = string_values.len();
let name: StringArray = std::iter::repeat(Some(name)).take(num_rows).collect();
let service_string: StringArray = string_values.iter().map(Some).collect();
let service_binary: BinaryArray = binary_values.iter().map(Some).collect();
let service_fixedsize: FixedSizeBinaryArray = fixedsize_values
.iter()
.map(|value| Some(value.as_slice()))
.collect::<Vec<_>>()
.into();

let schema = Schema::new(vec![
Field::new("name", name.data_type().clone(), true),
// note the column name has a period in it!
Field::new("service_string", service_string.data_type().clone(), true),
Field::new("service_binary", service_binary.data_type().clone(), true),
Field::new(
"service_fixedsize",
service_fixedsize.data_type().clone(),
true,
),
]);
let schema = Arc::new(schema);

RecordBatch::try_new(
schema,
vec![
Arc::new(name),
Arc::new(service_string),
Arc::new(service_binary),
Arc::new(service_fixedsize),
],
)
.unwrap()
}

/// returns a batch with two columns (note "service.name" is the name
/// of the column. It is *not* a table named service.name
///
Expand Down Expand Up @@ -604,6 +650,66 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_decimal_batch(vec![100000, 200000, 300000, 400000, 600000], 38, 5),
]
}
Scenario::ByteArray => {
// frontends first, then backends. All in order, except frontends 4 and 7
// are swapped to cause a statistics false positive on the 'fixed size' column.
vec![
make_bytearray_batch(
"all frontends",
vec![
"frontend one",
"frontend two",
"frontend three",
"frontend seven",
"frontend five",
],
vec![
b"frontend one",
b"frontend two",
b"frontend three",
b"frontend seven",
b"frontend five",
],
vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"],
),
make_bytearray_batch(
"mixed",
vec![
"frontend six",
"frontend four",
"backend one",
"backend two",
"backend three",
],
vec![
b"frontend six",
b"frontend four",
b"backend one",
b"backend two",
b"backend three",
],
vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"],
),
make_bytearray_batch(
"all backends",
vec![
"backend four",
"backend five",
"backend six",
"backend seven",
"backend eight",
],
vec![
b"backend four",
b"backend five",
b"backend six",
b"backend seven",
b"backend eight",
],
vec![b"be4", b"be5", b"be6", b"be7", b"be8"],
),
]
}
Scenario::PeriodsInColumnNames => {
vec![
// all frontend
Expand Down
102 changes: 102 additions & 0 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,108 @@ async fn prune_decimal_in_list() {
.await;
}

#[tokio::test]
async fn prune_string_eq_match() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string = 'backend one'",
)
.with_expected_errors(Some(0))
// false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three'
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(1))
.with_pruned_by_bloom_filter(Some(1))
.with_expected_rows(1)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_string_eq_no_match() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string = 'backend nine'",
)
.with_expected_errors(Some(0))
// false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three'
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(1))
.with_expected_rows(0)
.test_row_group_prune()
.await;

RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string = 'frontend nine'",
)
.with_expected_errors(Some(0))
// false positive on 'all frontends' batch: 'frontend five' < 'frontend nine' < 'frontend two'
// false positive on 'mixed' batch: 'backend one' < 'frontend nine' < 'frontend six'
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(2))
.with_expected_rows(0)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_string_neq() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string != 'backend one'",
)
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(3))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(3))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(14)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_string_lt() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string < 'backend one'",
)
.with_expected_errors(Some(0))
// matches 'all backends' only
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(3)
.test_row_group_prune()
.await;

RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string < 'backend zero'",
)
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
// all backends from 'mixed' and 'all backends'
.with_expected_rows(8)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_periods_in_column_names() {
// There are three row groups for "service.name", each with 5 rows = 15 rows total
Expand Down