From c526614ebbf6d1ab0aa95f43bda9b89ca00e8a37 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 20 Mar 2023 15:35:26 -0700 Subject: [PATCH] simplify Result type --- datafusion/common/src/cast.rs | 70 +++++++-------- .../src/physical_plan/aggregates/row_hash.rs | 3 +- datafusion/core/src/test_util/mod.rs | 6 +- .../core/tests/parquet/custom_reader.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 36 +++----- datafusion/expr/src/utils.rs | 2 +- .../src/simplify_expressions/regex.rs | 4 +- datafusion/optimizer/src/utils.rs | 4 +- .../physical-expr/src/aggregate/variance.rs | 3 +- datafusion/physical-expr/src/utils.rs | 4 +- datafusion/proto/src/common.rs | 8 +- .../proto/src/logical_plan/from_proto.rs | 4 +- datafusion/proto/src/logical_plan/mod.rs | 89 ++++++++----------- .../proto/src/physical_plan/from_proto.rs | 16 ++-- datafusion/proto/src/physical_plan/mod.rs | 71 +++++++-------- .../proto/src/physical_plan/to_proto.rs | 8 +- 16 files changed, 147 insertions(+), 185 deletions(-) diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs index 5e0bfbbd63b5c..fe909e7758156 100644 --- a/datafusion/common/src/cast.rs +++ b/datafusion/common/src/cast.rs @@ -20,7 +20,7 @@ //! but provide an error message rather than a panic, as the corresponding //! kernels in arrow-rs such as `as_boolean_array` do. -use crate::{downcast_value, DataFusionError}; +use crate::{downcast_value, DataFusionError, Result}; use arrow::{ array::{ Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, @@ -35,172 +35,162 @@ use arrow::{ }; // Downcast ArrayRef to Date32Array -pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array, DataFusionError> { +pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> { Ok(downcast_value!(array, Date32Array)) } // Downcast ArrayRef to StructArray -pub fn as_struct_array(array: &dyn Array) -> Result<&StructArray, DataFusionError> { +pub fn as_struct_array(array: &dyn Array) -> Result<&StructArray> { Ok(downcast_value!(array, StructArray)) } // Downcast ArrayRef to Int32Array -pub fn as_int32_array(array: &dyn Array) -> Result<&Int32Array, DataFusionError> { +pub fn as_int32_array(array: &dyn Array) -> Result<&Int32Array> { Ok(downcast_value!(array, Int32Array)) } // Downcast ArrayRef to Int64Array -pub fn as_int64_array(array: &dyn Array) -> Result<&Int64Array, DataFusionError> { +pub fn as_int64_array(array: &dyn Array) -> Result<&Int64Array> { Ok(downcast_value!(array, Int64Array)) } // Downcast ArrayRef to Decimal128Array -pub fn as_decimal128_array( - array: &dyn Array, -) -> Result<&Decimal128Array, DataFusionError> { +pub fn as_decimal128_array(array: &dyn Array) -> Result<&Decimal128Array> { Ok(downcast_value!(array, Decimal128Array)) } // Downcast ArrayRef to Float32Array -pub fn as_float32_array(array: &dyn Array) -> Result<&Float32Array, DataFusionError> { +pub fn as_float32_array(array: &dyn Array) -> Result<&Float32Array> { Ok(downcast_value!(array, Float32Array)) } // Downcast ArrayRef to Float64Array -pub fn as_float64_array(array: &dyn Array) -> Result<&Float64Array, DataFusionError> { +pub fn as_float64_array(array: &dyn Array) -> Result<&Float64Array> { Ok(downcast_value!(array, Float64Array)) } // Downcast ArrayRef to StringArray -pub fn as_string_array(array: &dyn Array) -> Result<&StringArray, DataFusionError> { +pub fn as_string_array(array: &dyn Array) -> Result<&StringArray> { Ok(downcast_value!(array, StringArray)) } // Downcast ArrayRef to UInt32Array -pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array, DataFusionError> { +pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array> { Ok(downcast_value!(array, UInt32Array)) } // Downcast ArrayRef to UInt64Array -pub fn as_uint64_array(array: &dyn Array) -> Result<&UInt64Array, DataFusionError> { +pub fn as_uint64_array(array: &dyn Array) -> Result<&UInt64Array> { Ok(downcast_value!(array, UInt64Array)) } // Downcast ArrayRef to BooleanArray -pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray, DataFusionError> { +pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray> { Ok(downcast_value!(array, BooleanArray)) } // Downcast ArrayRef to ListArray -pub fn as_list_array(array: &dyn Array) -> Result<&ListArray, DataFusionError> { +pub fn as_list_array(array: &dyn Array) -> Result<&ListArray> { Ok(downcast_value!(array, ListArray)) } // Downcast ArrayRef to DictionaryArray pub fn as_dictionary_array( array: &dyn Array, -) -> Result<&DictionaryArray, DataFusionError> { +) -> Result<&DictionaryArray> { Ok(downcast_value!(array, DictionaryArray, T)) } // Downcast ArrayRef to GenericBinaryArray pub fn as_generic_binary_array( array: &dyn Array, -) -> Result<&GenericBinaryArray, DataFusionError> { +) -> Result<&GenericBinaryArray> { Ok(downcast_value!(array, GenericBinaryArray, T)) } // Downcast ArrayRef to GenericListArray pub fn as_generic_list_array( array: &dyn Array, -) -> Result<&GenericListArray, DataFusionError> { +) -> Result<&GenericListArray> { Ok(downcast_value!(array, GenericListArray, T)) } // Downcast ArrayRef to LargeListArray -pub fn as_large_list_array( - array: &dyn Array, -) -> Result<&LargeListArray, DataFusionError> { +pub fn as_large_list_array(array: &dyn Array) -> Result<&LargeListArray> { Ok(downcast_value!(array, LargeListArray)) } // Downcast ArrayRef to PrimitiveArray pub fn as_primitive_array( array: &dyn Array, -) -> Result<&PrimitiveArray, DataFusionError> { +) -> Result<&PrimitiveArray> { Ok(downcast_value!(array, PrimitiveArray, T)) } // Downcast ArrayRef to MapArray -pub fn as_map_array(array: &dyn Array) -> Result<&MapArray, DataFusionError> { +pub fn as_map_array(array: &dyn Array) -> Result<&MapArray> { Ok(downcast_value!(array, MapArray)) } // Downcast ArrayRef to NullArray -pub fn as_null_array(array: &dyn Array) -> Result<&NullArray, DataFusionError> { +pub fn as_null_array(array: &dyn Array) -> Result<&NullArray> { Ok(downcast_value!(array, NullArray)) } // Downcast ArrayRef to NullArray -pub fn as_union_array(array: &dyn Array) -> Result<&UnionArray, DataFusionError> { +pub fn as_union_array(array: &dyn Array) -> Result<&UnionArray> { Ok(downcast_value!(array, UnionArray)) } // Downcast ArrayRef to TimestampNanosecondArray pub fn as_timestamp_nanosecond_array( array: &dyn Array, -) -> Result<&TimestampNanosecondArray, DataFusionError> { +) -> Result<&TimestampNanosecondArray> { Ok(downcast_value!(array, TimestampNanosecondArray)) } // Downcast ArrayRef to TimestampMillisecondArray pub fn as_timestamp_millisecond_array( array: &dyn Array, -) -> Result<&TimestampMillisecondArray, DataFusionError> { +) -> Result<&TimestampMillisecondArray> { Ok(downcast_value!(array, TimestampMillisecondArray)) } // Downcast ArrayRef to TimestampMicrosecondArray pub fn as_timestamp_microsecond_array( array: &dyn Array, -) -> Result<&TimestampMicrosecondArray, DataFusionError> { +) -> Result<&TimestampMicrosecondArray> { Ok(downcast_value!(array, TimestampMicrosecondArray)) } // Downcast ArrayRef to TimestampSecondArray -pub fn as_timestamp_second_array( - array: &dyn Array, -) -> Result<&TimestampSecondArray, DataFusionError> { +pub fn as_timestamp_second_array(array: &dyn Array) -> Result<&TimestampSecondArray> { Ok(downcast_value!(array, TimestampSecondArray)) } // Downcast ArrayRef to BinaryArray -pub fn as_binary_array(array: &dyn Array) -> Result<&BinaryArray, DataFusionError> { +pub fn as_binary_array(array: &dyn Array) -> Result<&BinaryArray> { Ok(downcast_value!(array, BinaryArray)) } // Downcast ArrayRef to FixedSizeListArray -pub fn as_fixed_size_list_array( - array: &dyn Array, -) -> Result<&FixedSizeListArray, DataFusionError> { +pub fn as_fixed_size_list_array(array: &dyn Array) -> Result<&FixedSizeListArray> { Ok(downcast_value!(array, FixedSizeListArray)) } // Downcast ArrayRef to FixedSizeListArray -pub fn as_fixed_size_binary_array( - array: &dyn Array, -) -> Result<&FixedSizeBinaryArray, DataFusionError> { +pub fn as_fixed_size_binary_array(array: &dyn Array) -> Result<&FixedSizeBinaryArray> { Ok(downcast_value!(array, FixedSizeBinaryArray)) } // Downcast ArrayRef to Date64Array -pub fn as_date64_array(array: &dyn Array) -> Result<&Date64Array, DataFusionError> { +pub fn as_date64_array(array: &dyn Array) -> Result<&Date64Array> { Ok(downcast_value!(array, Date64Array)) } // Downcast ArrayRef to GenericBinaryArray pub fn as_generic_string_array( array: &dyn Array, -) -> Result<&GenericStringArray, DataFusionError> { +) -> Result<&GenericStringArray> { Ok(downcast_value!(array, GenericStringArray, T)) } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index de769bbe8feae..b778b5f036b5d 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -29,7 +29,6 @@ use datafusion_physical_expr::hash_utils::create_hashes; use futures::ready; use futures::stream::{Stream, StreamExt}; -use crate::error::Result; use crate::execution::context::TaskContext; use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use crate::physical_plan::aggregates::{ @@ -47,7 +46,7 @@ use arrow::compute::cast; use arrow::datatypes::{DataType, Schema, UInt32Type}; use arrow::{array::ArrayRef, compute}; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; use datafusion_row::accessor::RowAccessor; use datafusion_row::layout::RowLayout; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 8c570cff8fd00..b2a561458ad8c 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -39,7 +39,7 @@ use crate::prelude::{CsvReadOptions, SessionContext}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use datafusion_common::{DataFusionError, Statistics, TableReference}; +use datafusion_common::{Statistics, TableReference}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::PhysicalSortExpr; use futures::Stream; @@ -216,7 +216,7 @@ pub fn scan_empty( name: Option<&str>, table_schema: &Schema, projection: Option>, -) -> Result { +) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema)); let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string()); @@ -229,7 +229,7 @@ pub fn scan_empty_with_partitions( table_schema: &Schema, projection: Option>, partitions: usize, -) -> Result { +) -> Result { let table_schema = Arc::new(table_schema.clone()); let provider = Arc::new(EmptyTable::new(table_schema).with_partitions(partitions)); let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string()); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 30802805cee36..041497eb74fd0 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -29,7 +29,7 @@ use datafusion::physical_plan::file_format::{ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{collect, Statistics}; use datafusion::prelude::SessionContext; -use datafusion_common::DataFusionError; +use datafusion_common::Result; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use object_store::memory::InMemory; @@ -119,7 +119,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { file_meta: FileMeta, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, - ) -> Result, DataFusionError> { + ) -> Result> { let metadata = file_meta .extensions .as_ref() diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0e0cfca920257..512926cc8b484 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -34,7 +34,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, - ScalarValue, TableReference, + Result, ScalarValue, TableReference, }; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; @@ -259,7 +259,7 @@ impl LogicalPlan { let mut exprs = vec![]; self.inspect_expressions(|e| { exprs.push(e.clone()); - Ok(()) as Result<(), DataFusionError> + Ok(()) as Result<()> }) // closure always returns OK .unwrap(); @@ -417,7 +417,7 @@ impl LogicalPlan { } /// returns all `Using` join columns in a logical plan - pub fn using_columns(&self) -> Result>, DataFusionError> { + pub fn using_columns(&self) -> Result>> { struct UsingJoinColumnVisitor { using_columns: Vec>, } @@ -437,7 +437,7 @@ impl LogicalPlan { on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| { accumu.insert(l.try_into_col()?); accumu.insert(r.try_into_col()?); - Result::<_, DataFusionError>::Ok(accumu) + Result::<_>::Ok(accumu) })?; self.using_columns.push(columns); } @@ -452,10 +452,7 @@ impl LogicalPlan { Ok(visitor.using_columns) } - pub fn with_new_inputs( - &self, - inputs: &[LogicalPlan], - ) -> Result { + pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> Result { from_plan(self, &self.expressions(), inputs) } @@ -464,7 +461,7 @@ impl LogicalPlan { pub fn with_param_values( self, param_values: Vec, - ) -> Result { + ) -> Result { match self { LogicalPlan::Prepare(prepare_lp) => { // Verify if the number of params matches the number of values @@ -658,12 +655,12 @@ impl LogicalPlan { pub fn replace_params_with_values( &self, param_values: &[ScalarValue], - ) -> Result { + ) -> Result { let new_exprs = self .expressions() .into_iter() .map(|e| Self::replace_placeholders_with_values(e, param_values)) - .collect::, DataFusionError>>()?; + .collect::>>()?; let new_inputs_with_values = self .inputs() @@ -675,9 +672,7 @@ impl LogicalPlan { } /// Walk the logical plan, find any `PlaceHolder` tokens, and return a map of their IDs and DataTypes - pub fn get_parameter_types( - &self, - ) -> Result>, DataFusionError> { + pub fn get_parameter_types(&self) -> Result>> { struct ParamTypeVisitor { param_types: HashMap>, } @@ -725,7 +720,7 @@ impl LogicalPlan { }; visitor = expr.accept(visitor)?; param_types.extend(visitor.param_types); - Ok(()) as Result<(), DataFusionError> + Ok(()) as Result<()> })?; self.param_types.extend(param_types); Ok(true) @@ -744,7 +739,7 @@ impl LogicalPlan { fn replace_placeholders_with_values( expr: Expr, param_values: &[ScalarValue], - ) -> Result { + ) -> Result { rewrite_expr(expr, |expr| { match &expr { Expr::Placeholder { id, data_type } => { @@ -1393,10 +1388,7 @@ pub struct Projection { impl Projection { /// Create a new Projection - pub fn try_new( - expr: Vec, - input: Arc, - ) -> Result { + pub fn try_new(expr: Vec, input: Arc) -> Result { let schema = Arc::new(DFSchema::new_with_metadata( exprlist_to_fields(&expr, &input)?, input.schema().metadata().clone(), @@ -1409,7 +1401,7 @@ impl Projection { expr: Vec, input: Arc, schema: DFSchemaRef, - ) -> Result { + ) -> Result { if expr.len() != schema.fields().len() { return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len()))); } @@ -1917,7 +1909,7 @@ impl Join { left: Arc, right: Arc, column_on: (Vec, Vec), - ) -> Result { + ) -> Result { let original_join = match original { LogicalPlan::Join(join) => join, _ => return plan_err!("Could not create join with project input"), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index d55338adc5ed9..556603f59641c 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -1035,7 +1035,7 @@ pub fn find_valid_equijoin_key_pair( right_schema.clone(), )?; - Result::<_, DataFusionError>::Ok(result) + Result::<_>::Ok(result) }; let join_key_pair = match (l_is_left, r_is_right) { diff --git a/datafusion/optimizer/src/simplify_expressions/regex.rs b/datafusion/optimizer/src/simplify_expressions/regex.rs index 038e1fcce9edf..13d170fd886f8 100644 --- a/datafusion/optimizer/src/simplify_expressions/regex.rs +++ b/datafusion/optimizer/src/simplify_expressions/regex.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Like, Operator}; use regex_syntax::hir::{Hir, HirKind, Literal}; @@ -26,7 +26,7 @@ pub fn simplify_regex_expr( left: Box, op: Operator, right: Box, -) -> Result { +) -> Result { let mode = OperatorMode::new(&op); if let Expr::Literal(ScalarValue::Utf8(Some(pattern))) = right.as_ref() { diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 235b07d9e86c5..ae38a216088df 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -18,7 +18,7 @@ //! Collection of utility functions that are leveraged by the query optimizer rules use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{plan_err, Column, DFSchemaRef, DataFusionError}; +use datafusion_common::{plan_err, Column, DFSchemaRef}; use datafusion_common::{DFSchema, Result}; use datafusion_expr::expr::{BinaryExpr, Sort}; use datafusion_expr::expr_rewriter::{ @@ -535,7 +535,7 @@ pub(crate) fn collect_subquery_cols( } cols.extend(using_cols); - Result::<_, DataFusionError>::Ok(cols) + Result::<_>::Ok(cols) }) } diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs index 657103e43e45b..3aa1d9bf1cc90 100644 --- a/datafusion/physical-expr/src/aggregate/variance.rs +++ b/datafusion/physical-expr/src/aggregate/variance.rs @@ -31,8 +31,7 @@ use arrow::{ datatypes::Field, }; use datafusion_common::downcast_value; -use datafusion_common::ScalarValue; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; /// VAR and VAR_SAMP aggregate expression diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 5b357d9318215..b5ca59c03d389 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -20,7 +20,7 @@ use crate::expressions::{BinaryExpr, Column, UnKnownColumn}; use crate::rewrite::{TreeNodeRewritable, TreeNodeRewriter}; use crate::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::SchemaRef; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::Result; use datafusion_expr::Operator; use petgraph::graph::NodeIndex; @@ -376,7 +376,7 @@ pub fn reassign_predicate_columns( pred: Arc, schema: &SchemaRef, ignore_not_found: bool, -) -> Result, DataFusionError> { +) -> Result> { pred.transform(&|expr| { if let Some(column) = expr.as_any().downcast_ref::() { let index = match schema.index_of(column.name()) { diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index d74083850abb5..ed826f5874137 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, Result}; -pub fn csv_delimiter_to_string(b: u8) -> Result { +pub fn csv_delimiter_to_string(b: u8) -> Result { let b = &[b]; let b = std::str::from_utf8(b) .map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?; Ok(b.to_owned()) } -pub fn str_to_byte(s: &String) -> Result { +pub fn str_to_byte(s: &String) -> Result { if s.len() != 1 { return Err(DataFusionError::Internal( "Invalid CSV delimiter".to_owned(), @@ -33,7 +33,7 @@ pub fn str_to_byte(s: &String) -> Result { Ok(s.as_bytes()[0]) } -pub fn byte_to_string(b: u8) -> Result { +pub fn byte_to_string(b: u8) -> Result { let b = &[b]; let b = std::str::from_utf8(b) .map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?; diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index aa416e63b8a63..845cb60d17a93 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -29,7 +29,7 @@ use arrow::datatypes::{ }; use datafusion::execution::registry::FunctionRegistry; use datafusion_common::{ - Column, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, + Column, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue, }; use datafusion_expr::{ @@ -1382,7 +1382,7 @@ pub fn parse_expr( } /// Parse an optional escape_char for Like, ILike, SimilarTo -fn parse_escape_char(s: &str) -> Result, DataFusionError> { +fn parse_escape_char(s: &str) -> Result> { match s.len() { 0 => Ok(None), 1 => Ok(s.chars().next()), diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 2a8d24d4103b1..7a8233f701403 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -40,6 +40,7 @@ use datafusion::{ }; use datafusion_common::{ context, parsers::CompressionTypeVariant, DataFusionError, OwnedTableReference, + Result, }; use datafusion_expr::{ logical_plan::{ @@ -72,11 +73,11 @@ impl From for DataFusionError { } pub trait AsLogicalPlan: Debug + Send + Sync + Clone { - fn try_decode(buf: &[u8]) -> Result + fn try_decode(buf: &[u8]) -> Result where Self: Sized; - fn try_encode(&self, buf: &mut B) -> Result<(), DataFusionError> + fn try_encode(&self, buf: &mut B) -> Result<()> where B: BufMut, Self: Sized; @@ -85,12 +86,12 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone { &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, - ) -> Result; + ) -> Result; fn try_from_logical_plan( plan: &LogicalPlan, extension_codec: &dyn LogicalExtensionCodec, - ) -> Result + ) -> Result where Self: Sized; } @@ -101,26 +102,22 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { buf: &[u8], inputs: &[LogicalPlan], ctx: &SessionContext, - ) -> Result; + ) -> Result; - fn try_encode( - &self, - node: &Extension, - buf: &mut Vec, - ) -> Result<(), DataFusionError>; + fn try_encode(&self, node: &Extension, buf: &mut Vec) -> Result<()>; fn try_decode_table_provider( &self, buf: &[u8], schema: SchemaRef, ctx: &SessionContext, - ) -> Result, DataFusionError>; + ) -> Result>; fn try_encode_table_provider( &self, node: Arc, buf: &mut Vec, - ) -> Result<(), DataFusionError>; + ) -> Result<()>; } #[derive(Debug, Clone)] @@ -132,17 +129,13 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { _buf: &[u8], _inputs: &[LogicalPlan], _ctx: &SessionContext, - ) -> Result { + ) -> Result { Err(DataFusionError::NotImplemented( "LogicalExtensionCodec is not provided".to_string(), )) } - fn try_encode( - &self, - _node: &Extension, - _buf: &mut Vec, - ) -> Result<(), DataFusionError> { + fn try_encode(&self, _node: &Extension, _buf: &mut Vec) -> Result<()> { Err(DataFusionError::NotImplemented( "LogicalExtensionCodec is not provided".to_string(), )) @@ -153,7 +146,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { _buf: &[u8], _schema: SchemaRef, _ctx: &SessionContext, - ) -> Result, DataFusionError> { + ) -> Result> { Err(DataFusionError::NotImplemented( "LogicalExtensionCodec is not provided".to_string(), )) @@ -163,7 +156,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { &self, _node: Arc, _buf: &mut Vec, - ) -> Result<(), DataFusionError> { + ) -> Result<()> { Err(DataFusionError::NotImplemented( "LogicalExtensionCodec is not provided".to_string(), )) @@ -184,7 +177,7 @@ macro_rules! into_logical_plan { fn from_owned_table_reference( table_ref: Option<&protobuf::OwnedTableReference>, error_context: &str, -) -> Result { +) -> Result { let table_ref = table_ref.ok_or_else(|| { DataFusionError::Internal(format!( "Protobuf deserialization error, {error_context} was missing required field name." @@ -195,7 +188,7 @@ fn from_owned_table_reference( } impl AsLogicalPlan for LogicalPlanNode { - fn try_decode(buf: &[u8]) -> Result + fn try_decode(buf: &[u8]) -> Result where Self: Sized, { @@ -204,7 +197,7 @@ impl AsLogicalPlan for LogicalPlanNode { }) } - fn try_encode(&self, buf: &mut B) -> Result<(), DataFusionError> + fn try_encode(&self, buf: &mut B) -> Result<()> where B: BufMut, Self: Sized, @@ -218,7 +211,7 @@ impl AsLogicalPlan for LogicalPlanNode { &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, - ) -> Result { + ) -> Result { let plan = self.logical_plan_type.as_ref().ok_or_else(|| { proto_error(format!( "logical_plan::from_proto() Unsupported logical plan '{self:?}'" @@ -683,7 +676,7 @@ impl AsLogicalPlan for LogicalPlanNode { .inputs .iter() .map(|i| i.try_into_logical_plan(ctx, extension_codec)) - .collect::>()?; + .collect::>()?; if input_plans.len() < 2 { return Err( DataFusionError::Internal(String::from( @@ -710,7 +703,7 @@ impl AsLogicalPlan for LogicalPlanNode { let input_plans: Vec = inputs .iter() .map(|i| i.try_into_logical_plan(ctx, extension_codec)) - .collect::>()?; + .collect::>()?; let extension_node = extension_codec.try_decode(node, &input_plans, ctx)?; @@ -773,7 +766,7 @@ impl AsLogicalPlan for LogicalPlanNode { fn try_from_logical_plan( plan: &LogicalPlan, extension_codec: &dyn LogicalExtensionCodec, - ) -> Result + ) -> Result where Self: Sized, { @@ -1286,7 +1279,7 @@ impl AsLogicalPlan for LogicalPlanNode { extension_codec, ) }) - .collect::>()?; + .collect::>()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Union( protobuf::UnionNode { inputs }, @@ -1325,7 +1318,7 @@ impl AsLogicalPlan for LogicalPlanNode { extension_codec, ) }) - .collect::>()?; + .collect::>()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Extension( @@ -1406,7 +1399,7 @@ mod roundtrip_tests { create_udf, CsvReadOptions, SessionConfig, SessionContext, }; use datafusion::test_util::{TestTableFactory, TestTableProvider}; - use datafusion_common::{DFSchemaRef, DataFusionError, ScalarValue}; + use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue}; use datafusion_expr::expr::{ self, Between, BinaryExpr, Case, Cast, GroupingSet, Like, Sort, }; @@ -1456,7 +1449,7 @@ mod roundtrip_tests { } #[tokio::test] - async fn roundtrip_logical_plan() -> Result<(), DataFusionError> { + async fn roundtrip_logical_plan() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; @@ -1489,17 +1482,13 @@ mod roundtrip_tests { _buf: &[u8], _inputs: &[LogicalPlan], _ctx: &SessionContext, - ) -> Result { + ) -> Result { Err(DataFusionError::NotImplemented( "No extension codec provided".to_string(), )) } - fn try_encode( - &self, - _node: &Extension, - _buf: &mut Vec, - ) -> Result<(), DataFusionError> { + fn try_encode(&self, _node: &Extension, _buf: &mut Vec) -> Result<()> { Err(DataFusionError::NotImplemented( "No extension codec provided".to_string(), )) @@ -1510,7 +1499,7 @@ mod roundtrip_tests { buf: &[u8], schema: SchemaRef, _ctx: &SessionContext, - ) -> Result, DataFusionError> { + ) -> Result> { let msg = TestTableProto::decode(buf).map_err(|_| { DataFusionError::Internal("Error decoding test table".to_string()) })?; @@ -1525,7 +1514,7 @@ mod roundtrip_tests { &self, node: Arc, buf: &mut Vec, - ) -> Result<(), DataFusionError> { + ) -> Result<()> { let table = node .as_ref() .as_any() @@ -1541,7 +1530,7 @@ mod roundtrip_tests { } #[tokio::test] - async fn roundtrip_custom_tables() -> Result<(), DataFusionError> { + async fn roundtrip_custom_tables() -> Result<()> { let mut table_factories: HashMap> = HashMap::new(); table_factories.insert("TESTTABLE".to_string(), Arc::new(TestTableFactory {})); @@ -1566,7 +1555,7 @@ mod roundtrip_tests { } #[tokio::test] - async fn roundtrip_logical_plan_aggregation() -> Result<(), DataFusionError> { + async fn roundtrip_logical_plan_aggregation() -> Result<()> { let ctx = SessionContext::new(); let schema = Schema::new(vec![ @@ -1593,7 +1582,7 @@ mod roundtrip_tests { } #[tokio::test] - async fn roundtrip_single_count_distinct() -> Result<(), DataFusionError> { + async fn roundtrip_single_count_distinct() -> Result<()> { let ctx = SessionContext::new(); let schema = Schema::new(vec![ @@ -1619,7 +1608,7 @@ mod roundtrip_tests { } #[tokio::test] - async fn roundtrip_logical_plan_with_extension() -> Result<(), DataFusionError> { + async fn roundtrip_logical_plan_with_extension() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; @@ -1631,7 +1620,7 @@ mod roundtrip_tests { } #[tokio::test] - async fn roundtrip_logical_plan_with_view_scan() -> Result<(), DataFusionError> { + async fn roundtrip_logical_plan_with_view_scan() -> Result<()> { let ctx = SessionContext::new(); ctx.register_csv("t1", "testdata/test.csv", CsvReadOptions::default()) .await?; @@ -1728,7 +1717,7 @@ mod roundtrip_tests { buf: &[u8], inputs: &[LogicalPlan], ctx: &SessionContext, - ) -> Result { + ) -> Result { if let Some((input, _)) = inputs.split_first() { let proto = proto::TopKPlanProto::decode(buf).map_err(|e| { DataFusionError::Internal(format!( @@ -1758,11 +1747,7 @@ mod roundtrip_tests { } } - fn try_encode( - &self, - node: &Extension, - buf: &mut Vec, - ) -> Result<(), DataFusionError> { + fn try_encode(&self, node: &Extension, buf: &mut Vec) -> Result<()> { if let Some(exec) = node.node.as_any().downcast_ref::() { let proto = proto::TopKPlanProto { k: exec.k as u64, @@ -1788,7 +1773,7 @@ mod roundtrip_tests { _buf: &[u8], _schema: SchemaRef, _ctx: &SessionContext, - ) -> Result, DataFusionError> { + ) -> Result> { Err(DataFusionError::Internal( "unsupported plan type".to_string(), )) @@ -1798,7 +1783,7 @@ mod roundtrip_tests { &self, _node: Arc, _buf: &mut Vec, - ) -> Result<(), DataFusionError> { + ) -> Result<()> { Err(DataFusionError::Internal( "unsupported plan type".to_string(), )) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 40ab07175f74c..b38ea2b35e37b 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -40,7 +40,7 @@ use datafusion::physical_plan::{ functions, Partitioning, }; use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics}; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, Result}; use object_store::path::Path; use object_store::ObjectMeta; use std::convert::{TryFrom, TryInto}; @@ -72,7 +72,7 @@ pub fn parse_physical_expr( proto: &protobuf::PhysicalExprNode, registry: &dyn FunctionRegistry, input_schema: &Schema, -) -> Result, DataFusionError> { +) -> Result> { let expr_type = proto .expr_type .as_ref() @@ -197,7 +197,7 @@ pub fn parse_physical_expr( )?, )) }) - .collect::, DataFusionError>>()?, + .collect::>>()?, e.else_expr .as_ref() .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema)) @@ -304,7 +304,7 @@ fn parse_required_physical_expr( registry: &dyn FunctionRegistry, field: &str, input_schema: &Schema, -) -> Result, DataFusionError> { +) -> Result> { expr.map(|e| parse_physical_expr(e, registry, input_schema)) .transpose()? .ok_or_else(|| { @@ -346,7 +346,7 @@ pub fn parse_protobuf_hash_partitioning( partitioning: Option<&protobuf::PhysicalHashRepartition>, registry: &dyn FunctionRegistry, input_schema: &Schema, -) -> Result, DataFusionError> { +) -> Result> { match partitioning { Some(hash_part) => { let expr = hash_part @@ -367,7 +367,7 @@ pub fn parse_protobuf_hash_partitioning( pub fn parse_protobuf_file_scan_config( proto: &protobuf::FileScanExecConf, registry: &dyn FunctionRegistry, -) -> Result { +) -> Result { let schema: Arc = Arc::new(convert_required!(proto.schema)?); let projection = proto .projection @@ -402,7 +402,7 @@ pub fn parse_protobuf_file_scan_config( schema.field_with_name(col)?.data_type().clone(), )) }) - .collect::, DataFusionError>>()?; + .collect::>>()?; let output_ordering = proto .output_ordering @@ -421,7 +421,7 @@ pub fn parse_protobuf_file_scan_config( }, }) }) - .collect::, DataFusionError>>()?; + .collect::>>()?; let output_ordering = if output_ordering.is_empty() { None } else { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index c31ff37474339..92986b0b39b26 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -47,7 +47,7 @@ use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec}; use datafusion::physical_plan::{ AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr, }; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, Result}; use prost::bytes::BufMut; use prost::Message; @@ -66,7 +66,7 @@ pub mod from_proto; pub mod to_proto; impl AsExecutionPlan for PhysicalPlanNode { - fn try_decode(buf: &[u8]) -> Result + fn try_decode(buf: &[u8]) -> Result where Self: Sized, { @@ -75,7 +75,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }) } - fn try_encode(&self, buf: &mut B) -> Result<(), DataFusionError> + fn try_encode(&self, buf: &mut B) -> Result<()> where B: BufMut, Self: Sized, @@ -91,7 +91,7 @@ impl AsExecutionPlan for PhysicalPlanNode { registry: &dyn FunctionRegistry, runtime: &RuntimeEnv, extension_codec: &dyn PhysicalExtensionCodec, - ) -> Result, DataFusionError> { + ) -> Result> { let plan = self.physical_plan_type.as_ref().ok_or_else(|| { proto_error(format!( "physical_plan::from_proto() Unsupported physical plan '{self:?}'" @@ -118,9 +118,13 @@ impl AsExecutionPlan for PhysicalPlanNode { .expr .iter() .zip(projection.expr_name.iter()) - .map(|(expr, name)| Ok((parse_physical_expr(expr,registry, input.schema().as_ref())?, name.to_string()))) - .collect::, String)>, DataFusionError>>( - )?; + .map(|(expr, name)| { + Ok(( + parse_physical_expr(expr, registry, input.schema().as_ref())?, + name.to_string(), + )) + }) + .collect::, String)>>>()?; Ok(Arc::new(ProjectionExec::try_new(exprs, input)?)) } PhysicalPlanType::Filter(filter) => { @@ -472,7 +476,7 @@ impl AsExecutionPlan for PhysicalPlanNode { let right = into_required!(col.right)?; Ok((left, right)) }) - .collect::>()?; + .collect::>()?; let join_type = protobuf::JoinType::from_i32(hashjoin.join_type) .ok_or_else(|| { proto_error(format!( @@ -510,11 +514,11 @@ impl AsExecutionPlan for PhysicalPlanNode { side: side.into(), }) }) - .collect::, DataFusionError>>()?; + .collect::>>()?; Ok(JoinFilter::new(expression, column_indices, schema)) }) - .map_or(Ok(None), |v: Result| v.map(Some))?; + .map_or(Ok(None), |v: Result| v.map(Some))?; let partition_mode = protobuf::PartitionMode::from_i32(hashjoin.partition_mode) @@ -660,7 +664,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .inputs .iter() .map(|i| i.try_into_physical_plan(registry, runtime, extension_codec)) - .collect::>()?; + .collect::>()?; let extension_node = extension_codec.try_decode( extension.node.as_slice(), @@ -676,7 +680,7 @@ impl AsExecutionPlan for PhysicalPlanNode { fn try_from_physical_plan( plan: Arc, extension_codec: &dyn PhysicalExtensionCodec, - ) -> Result + ) -> Result where Self: Sized, { @@ -706,7 +710,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .expr() .iter() .map(|expr| expr.0.clone().try_into()) - .collect::, DataFusionError>>()?; + .collect::>>()?; let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( @@ -808,10 +812,7 @@ impl AsExecutionPlan for PhysicalPlanNode { schema: Some(schema), }) }) - .map_or( - Ok(None), - |v: Result| v.map(Some), - )?; + .map_or(Ok(None), |v: Result| v.map(Some))?; let partition_mode = match exec.partition_mode() { PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft, @@ -869,7 +870,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .aggr_expr() .iter() .map(|expr| expr.to_owned().try_into()) - .collect::, DataFusionError>>()?; + .collect::>>()?; let agg_names = exec .aggr_expr() .iter() @@ -877,7 +878,7 @@ impl AsExecutionPlan for PhysicalPlanNode { Ok(field) => Ok(field.name().clone()), Err(e) => Err(e), }) - .collect::>()?; + .collect::>()?; let agg_mode = match exec.mode() { AggregateMode::Partial => protobuf::AggregateMode::Partial, @@ -897,14 +898,14 @@ impl AsExecutionPlan for PhysicalPlanNode { .null_expr() .iter() .map(|expr| expr.0.to_owned().try_into()) - .collect::, DataFusionError>>()?; + .collect::>>()?; let group_expr = exec .group_expr() .expr() .iter() .map(|expr| expr.0.to_owned().try_into()) - .collect::, DataFusionError>>()?; + .collect::>>()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new( @@ -1000,7 +1001,7 @@ impl AsExecutionPlan for PhysicalPlanNode { hash_expr: exprs .iter() .map(|expr| expr.clone().try_into()) - .collect::, DataFusionError>>()?, + .collect::>>()?, partition_count: *partition_count as u64, }) } @@ -1040,7 +1041,7 @@ impl AsExecutionPlan for PhysicalPlanNode { )), }) }) - .collect::, DataFusionError>>()?; + .collect::>>()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Sort(Box::new( protobuf::SortExecNode { @@ -1087,7 +1088,7 @@ impl AsExecutionPlan for PhysicalPlanNode { )), }) }) - .collect::, DataFusionError>>()?; + .collect::>>()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge( Box::new(protobuf::SortPreservingMergeExecNode { @@ -1109,7 +1110,7 @@ impl AsExecutionPlan for PhysicalPlanNode { extension_codec, ) }) - .collect::>()?; + .collect::>()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Extension( @@ -1126,11 +1127,11 @@ impl AsExecutionPlan for PhysicalPlanNode { } pub trait AsExecutionPlan: Debug + Send + Sync + Clone { - fn try_decode(buf: &[u8]) -> Result + fn try_decode(buf: &[u8]) -> Result where Self: Sized; - fn try_encode(&self, buf: &mut B) -> Result<(), DataFusionError> + fn try_encode(&self, buf: &mut B) -> Result<()> where B: BufMut, Self: Sized; @@ -1140,12 +1141,12 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { registry: &dyn FunctionRegistry, runtime: &RuntimeEnv, extension_codec: &dyn PhysicalExtensionCodec, - ) -> Result, DataFusionError>; + ) -> Result>; fn try_from_physical_plan( plan: Arc, extension_codec: &dyn PhysicalExtensionCodec, - ) -> Result + ) -> Result where Self: Sized; } @@ -1156,13 +1157,9 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { buf: &[u8], inputs: &[Arc], registry: &dyn FunctionRegistry, - ) -> Result, DataFusionError>; + ) -> Result>; - fn try_encode( - &self, - node: Arc, - buf: &mut Vec, - ) -> Result<(), DataFusionError>; + fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()>; } #[derive(Debug)] @@ -1174,7 +1171,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { _buf: &[u8], _inputs: &[Arc], _registry: &dyn FunctionRegistry, - ) -> Result, DataFusionError> { + ) -> Result> { Err(DataFusionError::NotImplemented( "PhysicalExtensionCodec is not provided".to_string(), )) @@ -1184,7 +1181,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { &self, _node: Arc, _buf: &mut Vec, - ) -> Result<(), DataFusionError> { + ) -> Result<()> { Err(DataFusionError::NotImplemented( "PhysicalExtensionCodec is not provided".to_string(), )) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 622f9d780a840..9210a2d7fafa2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -47,7 +47,7 @@ use datafusion::logical_expr::BuiltinScalarFunction; use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, GetIndexedFieldExpr}; use datafusion::physical_expr::ScalarFunctionExpr; use datafusion::physical_plan::joins::utils::JoinSide; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, Result}; impl TryFrom> for protobuf::PhysicalExprNode { type Error = DataFusionError; @@ -139,7 +139,7 @@ impl TryFrom> for protobuf::PhysicalExprNode { .expressions() .iter() .map(|e| e.clone().try_into()) - .collect::, DataFusionError>>()?; + .collect::>>()?; Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { @@ -364,7 +364,7 @@ impl TryFrom> for protobuf::PhysicalExprNode { fn try_parse_when_then_expr( when_expr: &Arc, then_expr: &Arc, -) -> Result { +) -> Result { Ok(protobuf::PhysicalWhenThen { when_expr: Some(when_expr.clone().try_into()?), then_expr: Some(then_expr.clone().try_into()?), @@ -462,7 +462,7 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf { nulls_first: o.options.nulls_first, }) }) - .collect::, DataFusionError>>()? + .collect::>>()? } else { vec![] };