From f609f0237a21a490ac9a5beee3ddcdc2b3cfefb9 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 5 Aug 2024 15:34:31 -0700 Subject: [PATCH 1/4] chore: bump DataFusion to rev c6f0d3c --- native/Cargo.lock | 28 +++++++-------- native/Cargo.toml | 14 ++++---- .../core/src/execution/datafusion/planner.rs | 34 ++++++++++++++++--- 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 3f6b1d1c71..ef3d3cf2e9 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -804,7 +804,7 @@ dependencies = [ [[package]] name = "datafusion" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -851,7 +851,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "arrow-schema", "async-trait", @@ -948,7 +948,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -968,7 +968,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "tokio", ] @@ -976,7 +976,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "arrow", "chrono", @@ -996,7 +996,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -1014,7 +1014,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "arrow", "arrow-buffer", @@ -1040,7 +1040,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -1057,7 +1057,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "arrow", "async-trait", @@ -1076,7 +1076,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -1105,7 +1105,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -1118,7 +1118,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1129,7 +1129,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "ahash", "arrow", @@ -1162,7 +1162,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "40.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=35c2e7e#35c2e7e7eb04e80877bbbc1fa4a5b06f31a4e4bc" +source = "git+https://github.com/apache/datafusion.git?rev=c6f0d3c#c6f0d3cac93ef1436313160f1dba878745d693bb" dependencies = [ "arrow", "arrow-array", diff --git a/native/Cargo.toml b/native/Cargo.toml index c6cf571a76..086a405d29 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,13 +39,13 @@ arrow-buffer = { version = "52.2.0" } arrow-data = { version = "52.2.0" } arrow-schema = { version = "52.2.0" } parquet = { version = "52.2.0", default-features = false, features = ["experimental"] } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e" } -datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", features = ["crypto_expressions"] } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } -datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "35c2e7e", default-features = false } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c" } +datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", features = ["crypto_expressions"] } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.2.0" } datafusion-comet-proto = { path = "proto", version = "0.2.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index b4d723eb1f..36b58c98e9 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -20,6 +20,8 @@ use arrow_schema::{DataType, Field, Schema, TimeUnit, DECIMAL128_MAX_PRECISION}; use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf, bit_xor_udaf}; use datafusion::functions_aggregate::count::count_udaf; +use datafusion::functions_aggregate::min_max::max_udaf; +use datafusion::functions_aggregate::min_max::min_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_plan::windows::BoundedWindowAggExec; use datafusion::physical_plan::InputOrderMode; @@ -33,7 +35,7 @@ use datafusion::{ execution_props::ExecutionProps, expressions::{ in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, - Literal as DataFusionLiteral, Max, Min, NotExpr, + Literal as DataFusionLiteral, NotExpr, }, AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, @@ -1254,14 +1256,36 @@ impl PhysicalPlanner { .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Min(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(Min::new(child, "min", datatype))) + let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); + create_aggregate_expr( + &min_udaf(), + &[child], + &[], + &[], + &[], + schema.as_ref(), + "min", + false, + false, + ).map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Max(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(Max::new(child, "max", datatype))) + let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); + create_aggregate_expr( + &max_udaf(), + &[child], + &[], + &[], + &[], + schema.as_ref(), + "max", + false, + false, + ).map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Sum(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; From 034b71a50087f7679f2f723a0459d1864b37933a Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 5 Aug 2024 16:27:59 -0700 Subject: [PATCH 2/4] fix style --- native/core/src/execution/datafusion/planner.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 36b58c98e9..4213cc038c 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1269,7 +1269,8 @@ impl PhysicalPlanner { "min", false, false, - ).map_err(|e| ExecutionError::DataFusionError(e.to_string())) + ) + .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Max(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; @@ -1285,7 +1286,8 @@ impl PhysicalPlanner { "max", false, false, - ).map_err(|e| ExecutionError::DataFusionError(e.to_string())) + ) + .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Sum(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; From 7ef995af3013a9a3b7ab558dd76c185ca952a257 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 5 Aug 2024 18:56:38 -0700 Subject: [PATCH 3/4] Trigger Build From e0bec8818307fdfdc59f22e6bce6d92760f6a4bf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 6 Aug 2024 11:26:15 -0600 Subject: [PATCH 4/4] Update native/Cargo.toml --- native/Cargo.toml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/native/Cargo.toml b/native/Cargo.toml index 086a405d29..a9b7d4fbdf 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,13 +39,13 @@ arrow-buffer = { version = "52.2.0" } arrow-data = { version = "52.2.0" } arrow-schema = { version = "52.2.0" } parquet = { version = "52.2.0", default-features = false, features = ["experimental"] } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c" } -datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", features = ["crypto_expressions"] } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } -datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } -datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "c6f0d3c", default-features = false } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "f4e519f" } +datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "f4e519f", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "f4e519f", features = ["crypto_expressions"] } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "f4e519f", default-features = false } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "f4e519f", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "f4e519f", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "f4e519f", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.2.0" } datafusion-comet-proto = { path = "proto", version = "0.2.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] }