diff --git a/datafusion/core/tests/sqllogictests/test_files/timestamps.slt b/datafusion/core/tests/sqllogictests/test_files/timestamps.slt index 1958adea13eaf..7ca513b99bad1 100644 --- a/datafusion/core/tests/sqllogictests/test_files/timestamps.slt +++ b/datafusion/core/tests/sqllogictests/test_files/timestamps.slt @@ -88,6 +88,12 @@ SELECT DATE_BIN('15 minutes', '2022-08-03 14:38:50Z', '1970-01-01T00:00:00Z') ---- 2022-08-03T14:30:00 +# Call in two arguments (should be the same as the above query) +query B +SELECT DATE_BIN('15 minutes', '2022-08-03 14:38:50Z') = DATE_BIN('15 minutes', '2022-08-03 14:38:50Z', '1970-01-01T00:00:00Z') +---- +true + # Shift forward by 5 minutes query P SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01T00:05:00Z') diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index cfb9b3baa22a5..defb41d45288a 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -448,11 +448,17 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature { ], fun.volatility(), ), - BuiltinScalarFunction::DateBin => Signature::exact( + BuiltinScalarFunction::DateBin => Signature::one_of( vec![ - DataType::Interval(IntervalUnit::DayTime), - DataType::Timestamp(TimeUnit::Nanosecond, None), - DataType::Timestamp(TimeUnit::Nanosecond, None), + TypeSignature::Exact(vec![ + DataType::Interval(IntervalUnit::DayTime), + DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), + TypeSignature::Exact(vec![ + DataType::Interval(IntervalUnit::DayTime), + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), ], fun.volatility(), ), diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 4ce1f18e8b83e..70f053328d89c 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -319,12 +319,18 @@ fn date_bin_single(stride: i64, source: i64, origin: i64) -> i64 { /// DATE_BIN sql function pub fn date_bin(args: &[ColumnarValue]) -> Result { - if args.len() != 3 { - return Err(DataFusionError::Execution( - "DATE_BIN expected three arguments".to_string(), - )); + if args.len() == 2 { + date_bin_2args(args) + } else if args.len() == 3 { + date_bin_3args(args) + } else { + Err(DataFusionError::Execution( + "DATE_BIN expected two or three arguments".to_string(), + )) } +} +fn date_bin_3args(args: &[ColumnarValue]) -> Result { let (stride, array, origin) = (&args[0], &args[1], &args[2]); let stride = match stride { @@ -398,6 +404,84 @@ pub fn date_bin(args: &[ColumnarValue]) -> Result { }) } +fn date_bin_2args(args: &[ColumnarValue]) -> Result { + let origin = &ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(0), + Some("+00:00".to_owned()), + )); + let (stride, array) = (&args[0], &args[1]); + + let stride = match stride { + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => { + let (days, ms) = IntervalDayTimeType::to_parts(*v); + let nanos = (Duration::days(days as i64) + Duration::milliseconds(ms as i64)) + .num_nanoseconds(); + match nanos { + Some(v) => v, + _ => { + return Err(DataFusionError::Execution( + "DATE_BIN stride argument is too large".to_string(), + )) + } + } + } + ColumnarValue::Scalar(v) => { + return Err(DataFusionError::Execution(format!( + "DATE_BIN expects stride argument to be an INTERVAL but got {}", + v.get_datatype() + ))) + } + ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented( + "DATE_BIN only supports literal values for the stride argument, not arrays" + .to_string(), + )), + }; + + let origin = match origin { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v, + ColumnarValue::Scalar(v) => { + return Err(DataFusionError::Execution(format!( + "DATE_BIN expects origin argument to be a TIMESTAMP but got {}", + v.get_datatype() + ))) + } + ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented( + "DATE_BIN only supports literal values for the origin argument, not arrays" + .to_string(), + )), + }; + + let f = |x: Option| x.map(|x| date_bin_single(stride, x, origin)); + + Ok(match array { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(f(*v), tz_opt.clone())) + } + ColumnarValue::Array(array) => match array.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let array = as_timestamp_nanosecond_array(array)? + .iter() + .map(f) + .collect::(); + + ColumnarValue::Array(Arc::new(array)) + } + _ => { + return Err(DataFusionError::Execution(format!( + "DATE_BIN expects source argument to be a TIMESTAMP but got {}", + array.data_type() + ))) + } + }, + _ => { + return Err(DataFusionError::Execution( + "DATE_BIN expects source argument to be a TIMESTAMP scalar or array" + .to_string(), + )); + } + }) +} + macro_rules! extract_date_part { ($ARRAY: expr, $FN:expr) => { match $ARRAY.data_type() { @@ -783,18 +867,22 @@ mod tests { ]); assert!(res.is_ok()); + let res = date_bin(&[ + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), + ]); + assert!(res.is_ok()); + // // Fallible test cases // // invalid number of arguments - let res = date_bin(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), - ]); + let res = + date_bin(&[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1)))]); assert_eq!( res.err().unwrap().to_string(), - "Execution error: DATE_BIN expected three arguments" + "Execution error: DATE_BIN expected two or three arguments" ); // stride: invalid type