Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 202 additions & 3 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

//! Helper functions for the table implementation

use std::collections::HashMap;
use std::sync::Arc;

use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use crate::logical_expr::{BinaryExpr, Operator};
use crate::{error::Result, scalar::ScalarValue};

use arrow::{
Expand Down Expand Up @@ -169,9 +171,17 @@ async fn list_partitions(
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
partition_prefix: Option<Path>,
) -> Result<Vec<Partition>> {
let partition = Partition {
path: table_path.prefix().clone(),
path: match partition_prefix {
Some(prefix) => Path::from_iter(
Path::from(table_path.prefix().as_ref())
.parts()
.chain(Path::from(prefix.as_ref()).parts()),
),
None => table_path.prefix().clone(),
},
depth: 0,
files: None,
};
Expand Down Expand Up @@ -305,6 +315,80 @@ async fn prune_partitions(
Ok(filtered)
}

#[derive(Debug)]
enum PartitionValue {
Single(String),
Multi,
}

fn populate_partition_values<'a>(
partition_values: &mut HashMap<&'a str, PartitionValue>,
filter: &'a Expr,
) {
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) = filter
{
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { ref name, .. }), Expr::Literal(val))
| (Expr::Literal(val), Expr::Column(Column { ref name, .. })) => {
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.is_some()
{
partition_values.insert(name, PartitionValue::Multi);
}
}
_ => {}
},
Operator::And => {
populate_partition_values(partition_values, left);
populate_partition_values(partition_values, right);
}
_ => {}
}
}
}

fn evaluate_partition_prefix<'a>(
partition_cols: &'a [(String, DataType)],
filters: &'a [Expr],
) -> Option<Path> {
let mut partition_values = HashMap::new();
for filter in filters {
populate_partition_values(&mut partition_values, filter);
}

if partition_values.is_empty() {
return None;
}

let mut parts = vec![];
for (p, _) in partition_cols {
match partition_values.get(p.as_str()) {
Some(PartitionValue::Single(val)) => {
// if a partition only has a single literal value, then it can be added to the
// prefix
parts.push(format!("{p}={val}"));
}
_ => {
// break on the first unconstrainted partition to create a common prefix
// for all covered partitions.
break;
}
}
}

if parts.is_empty() {
None
} else {
Some(Path::from_iter(parts))
}
}

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
Expand All @@ -327,7 +411,10 @@ pub async fn pruned_partition_list<'a>(
));
}

let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
let partitions =
list_partitions(store, table_path, partition_cols.len(), partition_prefix)
.await?;
debug!("Listed {} partitions", partitions.len());

let pruned =
Expand Down Expand Up @@ -416,7 +503,9 @@ where
mod tests {
use std::ops::Not;

use crate::logical_expr::{case, col, lit};
use futures::StreamExt;

use crate::logical_expr::{case, col, lit, Expr};
use crate::test::object_store::make_test_store_and_state;

use super::*;
Expand Down Expand Up @@ -675,4 +764,114 @@ mod tests {
// this helper function
assert!(expr_applicable_for_cols(&[], &lit(true)));
}

#[test]
fn test_evaluate_partition_prefix() {
let partitions = &[
("a".to_string(), DataType::Utf8),
("b".to_string(), DataType::Int16),
("c".to_string(), DataType::Boolean),
];

assert_eq!(
evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
Some(Path::from("a=foo")),
);

assert_eq!(
evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
Some(Path::from("a=foo")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(lit("foo")).and((col("b").eq(lit("bar"))))],
),
Some(Path::from("a=foo/b=bar")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
// list of filters should be evaluated as AND
&[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
),
Some(Path::from("a=foo/b=bar")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a")
.eq(lit("foo"))
.and(col("b").eq(lit("1")))
.and(col("c").eq(lit("true")))],
),
Some(Path::from("a=foo/b=1/c=true")),
);

// no prefix when filter is empty
assert_eq!(evaluate_partition_prefix(partitions, &[]), None);

// b=foo results in no prefix because a is not restricted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding some other tests:

  1. another negative test like a < 5 to cover the fact that only = predicates are allowed
  2. A test with the literal and constant swapped (foo = a)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, good suggestions.

assert_eq!(
evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
None,
);

// a=foo and c=baz only results in preifx a=foo because b is not restricted
assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
),
Some(Path::from("a=foo")),
);

// partition with multiple values results in no prefix
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
),
None,
);

// no prefix because partition a is not restricted to a single literal
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
),
None,
);
assert_eq!(
evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
None,
);
}

#[test]
fn test_evaluate_date_partition_prefix() {
let partitions = &[("a".to_string(), DataType::Date32)];
assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3))))],
),
Some(Path::from("a=1970-01-04")),
);

let partitions = &[("a".to_string(), DataType::Date64)];
assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(Expr::Literal(ScalarValue::Date64(Some(
4 * 24 * 60 * 60 * 1000
)))),],
),
Some(Path::from("a=1970-01-05")),
);
}
}