Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ SELECT DATE_BIN('15 minutes', '2022-08-03 14:38:50Z', '1970-01-01T00:00:00Z')
----
2022-08-03T14:30:00

# Call in two arguments (should be the same as the above query)
query B
SELECT DATE_BIN('15 minutes', '2022-08-03 14:38:50Z') = DATE_BIN('15 minutes', '2022-08-03 14:38:50Z', '1970-01-01T00:00:00Z')
----
true

# Shift forward by 5 minutes
query P
SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01T00:05:00Z')
Expand Down
14 changes: 10 additions & 4 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,17 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
],
fun.volatility(),
),
BuiltinScalarFunction::DateBin => Signature::exact(
BuiltinScalarFunction::DateBin => Signature::one_of(
vec![
DataType::Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
TypeSignature::Exact(vec![
DataType::Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
TypeSignature::Exact(vec![
DataType::Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
],
fun.volatility(),
),
Expand Down
106 changes: 97 additions & 9 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,18 @@ fn date_bin_single(stride: i64, source: i64, origin: i64) -> i64 {

/// DATE_BIN sql function
pub fn date_bin(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 3 {
return Err(DataFusionError::Execution(
"DATE_BIN expected three arguments".to_string(),
));
if args.len() == 2 {
date_bin_2args(args)
} else if args.len() == 3 {
date_bin_3args(args)
} else {
Err(DataFusionError::Execution(
"DATE_BIN expected two or three arguments".to_string(),
))
}
}

fn date_bin_3args(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let (stride, array, origin) = (&args[0], &args[1], &args[2]);

let stride = match stride {
Expand Down Expand Up @@ -398,6 +404,84 @@ pub fn date_bin(args: &[ColumnarValue]) -> Result<ColumnarValue> {
})
}

fn date_bin_2args(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let origin = &ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(0),
Some("+00:00".to_owned()),
));
let (stride, array) = (&args[0], &args[1]);

let stride = match stride {
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
let (days, ms) = IntervalDayTimeType::to_parts(*v);
let nanos = (Duration::days(days as i64) + Duration::milliseconds(ms as i64))
.num_nanoseconds();
match nanos {
Some(v) => v,
_ => {
return Err(DataFusionError::Execution(
"DATE_BIN stride argument is too large".to_string(),
))
}
}
}
ColumnarValue::Scalar(v) => {
return Err(DataFusionError::Execution(format!(
"DATE_BIN expects stride argument to be an INTERVAL but got {}",
v.get_datatype()
)))
}
ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
"DATE_BIN only supports literal values for the stride argument, not arrays"
.to_string(),
)),
};

let origin = match origin {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
ColumnarValue::Scalar(v) => {
return Err(DataFusionError::Execution(format!(
"DATE_BIN expects origin argument to be a TIMESTAMP but got {}",
v.get_datatype()
)))
}
ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
"DATE_BIN only supports literal values for the origin argument, not arrays"
.to_string(),
)),
};

let f = |x: Option<i64>| x.map(|x| date_bin_single(stride, x, origin));

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(f(*v), tz_opt.clone()))
}
ColumnarValue::Array(array) => match array.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
let array = as_timestamp_nanosecond_array(array)?
.iter()
.map(f)
.collect::<TimestampNanosecondArray>();

ColumnarValue::Array(Arc::new(array))
}
_ => {
return Err(DataFusionError::Execution(format!(
"DATE_BIN expects source argument to be a TIMESTAMP but got {}",
array.data_type()
)))
}
},
_ => {
return Err(DataFusionError::Execution(
"DATE_BIN expects source argument to be a TIMESTAMP scalar or array"
.to_string(),
));
}
})
}

macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
Expand Down Expand Up @@ -783,18 +867,22 @@ mod tests {
]);
assert!(res.is_ok());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a test case verifying two-argument call equal to three-argument one with epoch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this kind of test in timestamps.slt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could verify using a query like

SELECT 
  DATE_BIN('15 minutes', '2022-08-03 14:38:50Z', '1970-01-01T00:00:00Z') = 
  DATE_BIN('15 minutes', '2022-08-03 14:38:50Z')

And ensure it is true


let res = date_bin(&[
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
]);
assert!(res.is_ok());

//
// Fallible test cases
//

// invalid number of arguments
let res = date_bin(&[
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
]);
let res =
date_bin(&[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1)))]);
assert_eq!(
res.err().unwrap().to_string(),
"Execution error: DATE_BIN expected three arguments"
"Execution error: DATE_BIN expected two or three arguments"
);

// stride: invalid type
Expand Down