diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 9087fe989c4b6..3de7482d714c9 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -20,439 +20,6 @@ use ::parquet::arrow::arrow_writer::ArrowWriter; use ::parquet::file::properties::WriterProperties; use datafusion::execution::options::ReadOptions; -#[tokio::test] -async fn window_in_expression() -> Result<()> { - let ctx = SessionContext::new(); - let sql = "select 1 - lag(amount, 1) over (order by idx) as column1 from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount)"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+---------+", - "| column1 |", - "+---------+", - "| |", - "| -99 |", - "+---------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_with_agg_in_expression() -> Result<()> { - let ctx = SessionContext::new(); - let sql = "select col1, idx, count(*), sum(amount), lag(sum(amount), 1) over (order by idx) as prev_amount, - sum(amount) - lag(sum(amount), 1) over (order by idx) as difference from ( - select * from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) - ) a - group by col1, idx;"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+------+-----+-----------------+---------------+-------------+------------+", - "| col1 | idx | COUNT(UInt8(1)) | SUM(a.amount) | prev_amount | difference |", - "+------+-----+-----------------+---------------+-------------+------------+", - "| a | 1 | 1 | 100 | | |", - "| a | 2 | 1 | 150 | 100 | 50 |", - "+------+-----+-----------------+---------------+-------------+------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_empty() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c3) OVER() as sum1, \ - COUNT(*) OVER () as count1 \ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+------+--------+", - "| sum1 | count1 |", - "+------+--------+", - "| 781 | 100 |", - "| 781 | 100 |", - "| 781 | 100 |", - "| 781 | 100 |", - "| 781 | 100 |", - "+------+--------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_preceding() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - AVG(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - COUNT(*) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------------+----------------------------+-----------------+", - "| SUM(aggregate_test_100.c4) | AVG(aggregate_test_100.c4) | COUNT(UInt8(1)) |", - "+----------------------------+----------------------------+-----------------+", - "| -48302 | -16100.666666666666 | 3 |", - "| 11243 | 3747.6666666666665 | 3 |", - "| -51311 | -17103.666666666668 | 3 |", - "| -2391 | -797.0 | 3 |", - "| 46756 | 15585.333333333334 | 3 |", - "+----------------------------+----------------------------+-----------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_preceding_stddev_variance() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - VAR(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - VAR_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - STDDEV(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - STDDEV_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+---------------------------------+------------------------------------+-------------------------------+----------------------------------+", - "| VARIANCE(aggregate_test_100.c4) | VARIANCEPOP(aggregate_test_100.c4) | STDDEV(aggregate_test_100.c4) | STDDEVPOP(aggregate_test_100.c4) |", - "+---------------------------------+------------------------------------+-------------------------------+----------------------------------+", - "| 46721.33333333174 | 31147.555555554496 | 216.15118166073427 | 176.4867007894773 |", - "| 2639429.333333332 | 1759619.5555555548 | 1624.6320609089714 | 1326.5065229977404 |", - "| 746202.3333333324 | 497468.2222222216 | 863.8300372951455 | 705.3142719541563 |", - "| 768422.9999999981 | 512281.9999999988 | 876.5973990378925 | 715.7387791645767 |", - "| 66526.3333333288 | 44350.88888888587 | 257.9269922542594 | 210.5965073045749 |", - "+---------------------------------+------------------------------------+-------------------------------+----------------------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_preceding_with_partition_unique_order_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - AVG(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------------+----------------------------+-----------------+", - "| SUM(aggregate_test_100.c4) | AVG(aggregate_test_100.c4) | COUNT(UInt8(1)) |", - "+----------------------------+----------------------------+-----------------+", - "| -38611 | -19305.5 | 2 |", - "| 17547 | 8773.5 | 2 |", - "| -1301 | -650.5 | 2 |", - "| 26638 | 13319.0 | 3 |", - "| 26861 | 8953.666666666666 | 3 |", - "+----------------------------+----------------------------+-----------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} -/// The partition by clause conducts sorting according to given partition column by default. If the -/// sorting columns have non unique values, the unstable sorting may produce indeterminate results. -/// Therefore, we are commenting out the following test for now. - -// #[tokio::test] -// async fn window_frame_rows_preceding_with_non_unique_partition() -> Result<()> { -// let ctx = SessionContext::new(); -// register_aggregate_csv(&ctx).await?; -// let sql = "SELECT \ -// SUM(c4) OVER(PARTITION BY c1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ -// COUNT(*) OVER(PARTITION BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ -// FROM aggregate_test_100 \ -// ORDER BY c9 \ -// LIMIT 5"; -// let actual = execute_to_batches(&ctx, sql).await; -// let expected = vec![ -// "+----------------------------+-----------------+", -// "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |", -// "+----------------------------+-----------------+", -// "| -33822 | 3 |", -// "| 20808 | 3 |", -// "| -29881 | 3 |", -// "| -47613 | 3 |", -// "| -13474 | 3 |", -// "+----------------------------+-----------------+", -// ]; -// assert_batches_eq!(expected, &actual); -// Ok(()) -// } - -#[tokio::test] -async fn window_frame_ranges_preceding_following_desc() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - SUM(c3) OVER(ORDER BY c2 DESC RANGE BETWEEN 10000 PRECEDING AND 10000 FOLLOWING),\ - COUNT(*) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) \ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------------+----------------------------+-----------------+", - "| SUM(aggregate_test_100.c4) | SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |", - "+----------------------------+----------------------------+-----------------+", - "| 52276 | 781 | 56 |", - "| 260620 | 781 | 63 |", - "| -28623 | 781 | 37 |", - "| 260620 | 781 | 63 |", - "| 260620 | 781 | 63 |", - "+----------------------------+----------------------------+-----------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_asc_desc_large() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT - SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1 - FROM aggregate_test_100 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------------+", - "| sum1 |", - "+-------------+", - "| -1383162419 |", - "| -3265456275 |", - "| -3909681744 |", - "| -5241214934 |", - "| -4246910946 |", - "+-------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_desc_large() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT - SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1 - FROM aggregate_test_100 - ORDER BY c9 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------------+", - "| sum1 |", - "+-------------+", - "| 11212193439 |", - "| 22799733943 |", - "| 2935356871 |", - "| 15810962683 |", - "| 18035025006 |", - "+-------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_timestamp_order_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - SUM(c1) OVER (ORDER BY c2 DESC) as summation1 - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+------------+", - "| summation1 |", - "+------------+", - "| 962 |", - "| 962 |", - "| 962 |", - "| 962 |", - "| 962 |", - "+------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_desc() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 DESC RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_asc() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - ORDER BY c1 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 2 |", - "| 2 |", - "| 2 |", - "| 2 |", - "| 5 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_asc_null_first() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 NULLS FIRST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_desc_null_last() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 DESC NULLS LAST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 5 |", - "| 5 |", - "| 5 |", - "| 6 |", - "| 6 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_order_by_null() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as b, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as c, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as d, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as e, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as f, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as g, - SUM(c1) OVER (ORDER BY c3) as h, - SUM(c1) OVER (ORDER BY c3 DESC) as i, - SUM(c1) OVER (ORDER BY c3 NULLS first) as j, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l, - SUM(c1) OVER (ORDER BY c3, c2) as m, - SUM(c1) OVER (ORDER BY c3, c1 DESC) as n, - SUM(c1) OVER (ORDER BY c3 DESC, c1) as o, - SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as b1, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as c1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as d1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as e1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as f1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as g1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as h1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as j1, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as k1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as l1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as m1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as n1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as o1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as h11, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as j11, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as k11, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as l11, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as m11, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as n11, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as o11 - FROM null_cases - ORDER BY c3 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+-----+-----+-----+------+-----+------+------+-----+------+------+-----+------+-----+-----+------+", - "| a | b | c | d | e | f | g | h | i | j | k | l | m | n | o | p | a1 | b1 | c1 | d1 | e1 | f1 | g1 | h1 | j1 | k1 | l1 | m1 | n1 | o1 | h11 | j11 | k11 | l11 | m11 | n11 | o11 |", - "+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+-----+-----+-----+------+-----+------+------+-----+------+------+-----+------+-----+-----+------+", - "| 412 | 412 | 339 | 412 | 339 | 339 | 412 | | 4627 | | 4627 | 4627 | | | 4627 | | 412 | 412 | 4627 | 412 | 4627 | 4627 | 412 | | | 4627 | | 4627 | 4627 | | 4627 | 4627 | | 4627 | | | 4627 |", - "| 488 | 488 | 412 | 488 | 412 | 412 | 488 | 72 | 4627 | 72 | 4627 | 4627 | 72 | 72 | 4627 | 72 | 488 | 488 | 4627 | 488 | 4627 | 4627 | 488 | 72 | 72 | 4627 | 72 | 4627 | 4627 | 72 | 4627 | 4627 | 72 | 4627 | 72 | 72 | 4627 |", - "| 543 | 543 | 488 | 543 | 488 | 488 | 543 | 96 | 4555 | 96 | 4555 | 4555 | 96 | 96 | 4555 | 96 | 543 | 543 | 4627 | 543 | 4627 | 4627 | 543 | 96 | 96 | 4555 | 96 | 4555 | 4555 | 96 | 4555 | 4555 | 96 | 4555 | 96 | 96 | 4555 |", - "| 553 | 553 | 543 | 553 | 543 | 543 | 553 | 115 | 4531 | 115 | 4531 | 4531 | 115 | 115 | 4531 | 115 | 553 | 553 | 4627 | 553 | 4627 | 4627 | 553 | 115 | 115 | 4531 | 115 | 4531 | 4531 | 115 | 4531 | 4531 | 115 | 4531 | 115 | 115 | 4531 |", - "| 553 | 553 | 553 | 553 | 553 | 553 | 553 | 140 | 4512 | 140 | 4512 | 4512 | 140 | 140 | 4512 | 140 | 553 | 553 | 4627 | 553 | 4627 | 4627 | 553 | 140 | 140 | 4512 | 140 | 4512 | 4512 | 140 | 4512 | 4512 | 140 | 4512 | 140 | 140 | 4512 |", - "+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+-----+-----+-----+------+-----+------+------+-----+------+------+-----+------+-----+-----+------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - #[tokio::test] async fn window_frame_rows_preceding_with_unique_partition() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 7c83c1b9b5550..cbbc82c91653d 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -35,6 +35,16 @@ STORED AS CSV WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv' +statement ok +CREATE EXTERNAL TABLE null_cases( + c1 BIGINT NULL, + c2 DOUBLE NULL, + c3 BIGINT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION 'tests/data/null_cases.csv'; + ### This is the same table as ### execute_with_partition with 4 partitions statement ok @@ -398,3 +408,275 @@ WITH _sample_data AS ( ---- aa 3 2 bb 7 2 + + +# async fn window_in_expression +query I +select 1 - lag(amount, 1) over (order by idx) as column1 from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) +--- +---- +NULL +-99 + + +# async fn window_with_agg_in_expression +query TIIIII +select col1, idx, count(*), sum(amount), lag(sum(amount), 1) over (order by idx) as prev_amount, +sum(amount) - lag(sum(amount), 1) over (order by idx) as difference from ( +select * from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) +) a +group by col1, idx +---- +a 1 1 100 NULL NULL +a 2 1 150 100 50 + + +# async fn window_frame_empty +query II +SELECT +SUM(c3) OVER() as sum1, +COUNT(*) OVER () as count1 +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +781 100 +781 100 +781 100 +781 100 +781 100 + +# async fn window_frame_rows_preceding +query IRI +SELECT +SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +AVG(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +COUNT(*) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +-48302 -16100.666666666666 3 +11243 3747.666666666667 3 +-51311 -17103.666666666668 3 +-2391 -797 3 +46756 15585.333333333334 3 + + +# async fn window_frame_rows_preceding_stddev_variance +query RRRR +SELECT +VAR(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +VAR_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +STDDEV(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +STDDEV_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +46721.33333333174 31147.555555554496 216.151181660734 176.486700789477 +2639429.333333332 1759619.5555555548 1624.632060908971 1326.50652299774 +746202.3333333324 497468.2222222216 863.830037295146 705.314271954156 +768422.9999999981 512281.9999999988 876.597399037893 715.738779164577 +66526.3333333288 44350.88888888587 257.926992254259 210.596507304575 + +# async fn window_frame_rows_preceding_with_partition_unique_order_by +query IRI +SELECT +SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +AVG(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +-38611 -19305.5 2 +17547 8773.5 2 +-1301 -650.5 2 +26638 13319 3 +26861 8953.666666666666 3 + +# /// The partition by clause conducts sorting according to given partition column by default. If the +# /// sorting columns have non unique values, the unstable sorting may produce indeterminate results. +# /// Therefore, we are commenting out the following test for now. + +#// #[tokio::test] +#// async fn window_frame_rows_preceding_with_non_unique_partition +#// let ctx = SessionContext::new(); +#// register_aggregate_csv(&ctx).await?; +#// let sql = "SELECT +#// SUM(c4) OVER(PARTITION BY c1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +#// COUNT(*) OVER(PARTITION BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +#// FROM aggregate_test_100 +#// ORDER BY c9 +#// LIMIT 5 +#// let actual = execute_to_batches(&ctx, sql).await; +#// let expected = vec![ +#// "+----------------------------+-----------------+", +#// "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |", +#// "+----------------------------+-----------------+", +#// "| -33822 | 3|", +#// "| 20808 | 3|", +#// "| -29881 | 3|", +#// "| -47613 | 3|", +#// "| -13474 | 3|", +#// "+----------------------------+-----------------+", +#// ]; +#// assert_batches_eq!(expected, &actual); +#// Ok(()) +#// } + +# async fn window_frame_ranges_preceding_following_desc +# This query should pass. Tracked in https://github.com/apache/arrow-datafusion/issues/5346 +query error DataFusion error: Internal error: Operator \+ is not implemented +SELECT +SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), +SUM(c3) OVER(ORDER BY c2 DESC RANGE BETWEEN 10000 PRECEDING AND 10000 FOLLOWING), +COUNT(*) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 + +# async fn window_frame_order_by_asc_desc_large +query I +SELECT + SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1 + FROM aggregate_test_100 + LIMIT 5 +---- +-1383162419 +-3265456275 +-3909681744 +-5241214934 +-4246910946 + + +# async fn window_frame_order_by_desc_large +query I +SELECT + SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1 + FROM aggregate_test_100 + ORDER BY c9 + LIMIT 5 +---- +11212193439 +22799733943 +2935356871 +15810962683 +18035025006 + +# async fn window_frame_order_by_null_timestamp_order_by +query I +SELECT + SUM(c1) OVER (ORDER BY c2 DESC) as summation1 + FROM null_cases + LIMIT 5 +---- +962 +962 +962 +962 +962 + +# async fn window_frame_order_by_null_desc +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 DESC RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + LIMIT 5 +---- +9 +9 +9 +9 +9 + +# async fn window_frame_order_by_null_asc +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + ORDER BY c1 + LIMIT 5 +---- +2 +2 +2 +2 +5 + +# async fn window_frame_order_by_null_asc_null_first +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 NULLS FIRST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + LIMIT 5 +---- +9 +9 +9 +9 +9 + +# async fn window_frame_order_by_null_desc_null_last +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 DESC NULLS LAST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + LIMIT 5 +---- +5 +5 +5 +6 +6 + +# async fn window_frame_rows_order_by_null +query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII +SELECT + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as b, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as c, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as d, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as e, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as f, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as g, + SUM(c1) OVER (ORDER BY c3) as h, + SUM(c1) OVER (ORDER BY c3 DESC) as i, + SUM(c1) OVER (ORDER BY c3 NULLS first) as j, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l, + SUM(c1) OVER (ORDER BY c3, c2) as m, + SUM(c1) OVER (ORDER BY c3, c1 DESC) as n, + SUM(c1) OVER (ORDER BY c3 DESC, c1) as o, + SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as b1, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as c1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as d1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as e1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as f1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as g1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as h1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as j1, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as k1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as l1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as m1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as n1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as o1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as h11, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as j11, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as k11, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as l11, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as m11, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as n11, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as o11 + FROM null_cases + ORDER BY c3 + LIMIT 5 +---- +412 412 339 412 339 339 412 NULL 4627 NULL 4627 4627 NULL NULL 4627 NULL 412 412 4627 412 4627 4627 412 NULL NULL 4627 NULL 4627 4627 NULL 4627 4627 NULL 4627 NULL NULL 4627 +488 488 412 488 412 412 488 72 4627 72 4627 4627 72 72 4627 72 488 488 4627 488 4627 4627 488 72 72 4627 72 4627 4627 72 4627 4627 72 4627 72 72 4627 +543 543 488 543 488 488 543 96 4555 96 4555 4555 96 96 4555 96 543 543 4627 543 4627 4627 543 96 96 4555 96 4555 4555 96 4555 4555 96 4555 96 96 4555 +553 553 543 553 543 543 553 115 4531 115 4531 4531 115 115 4531 115 553 553 4627 553 4627 4627 553 115 115 4531 115 4531 4531 115 4531 4531 115 4531 115 115 4531 +553 553 553 553 553 553 553 140 4512 140 4512 4512 140 140 4512 140 553 553 4627 553 4627 4627 553 140 140 4512 140 4512 4512 140 4512 4512 140 4512 140 140 4512 diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index c25d2491e45ac..7794125d0eecf 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -164,7 +164,7 @@ pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result