diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a541091e3a2b2..6a14c0ee8b48d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -98,7 +98,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --lib --tests --bins --features avro,json,backtrace + run: RUST_MIN_STACK=504857600 cargo test --lib --tests --bins --features avro,json,backtrace - name: Verify Working Directory Clean run: git diff --exit-code diff --git a/datafusion-examples/examples/bench.py b/datafusion-examples/examples/bench.py new file mode 100644 index 0000000000000..f9f8e2e7c57eb --- /dev/null +++ b/datafusion-examples/examples/bench.py @@ -0,0 +1,88 @@ +import polars as pl +import time +from datetime import date +from datafusion import SessionContext + +t = time.time() + +#file = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet" + + +file = "/home/dev/arrow-datafusion/test_out/benchon.parquet" +#file = "/home/dev/arrow-datafusion/test_out/uncompressed.parquet" + + +# Create a DataFusion context +ctx = SessionContext() + +# Register table with context +ctx.register_parquet('test', file) + +times = [] +for i in range(5): + + t = time.time() + df = pl.scan_parquet(file) \ + .filter(pl.col("l_shipdate") <= date(1998, 9, 2)) \ + .group_by("l_returnflag", "l_linestatus") \ + .agg([ + pl.col("l_quantity").sum().alias("sum_qty"), + pl.col("l_extendedprice").sum().alias("sum_base_price"), + (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).sum().alias("sum_disc_price"), + (pl.col("l_extendedprice") * (1 - pl.col("l_discount")) * (1 + pl.col("l_tax"))).sum().alias("sum_charge"), + pl.col("l_quantity").mean().alias("avg_qty"), + pl.col("l_extendedprice").mean().alias("avg_price"), + pl.col("l_discount").mean().alias("avg_disc"), + pl.count().alias("count_order") + ] + ) \ + .sort([pl.col("l_returnflag"), pl.col("l_linestatus")]) + df = df.collect() + + print(f"polars agg query {time.time()-t}s") + + #t = time.time() + #pl.scan_parquet(file).sink_parquet("test_out/pl.parquet") + #print(f"polars re-endcode job {time.time()-t}") + + + t = time.time() + + + query = """ + select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order +from + test +where + l_shipdate <= date '1998-09-02' +group by + l_returnflag, + l_linestatus +order by + l_returnflag, + l_linestatus; + """ + + # Execute SQL + df = ctx.sql(f"{query}").cache() + elapsed = time.time() - t + times.append(elapsed) + print(f"datafusion agg query {elapsed}s") + + # t = time.time() + # df = ctx.sql("copy test to 'test_out/df.parquet'") + # df.show() + # print(f"datafusion reendcode job {time.time() - t}s") + +print(sum(times)/len(times)) + diff --git a/datafusion-examples/test_csv/5idJzooCjySkTpLf_0.csv b/datafusion-examples/test_csv/5idJzooCjySkTpLf_0.csv new file mode 100644 index 0000000000000..f4263141197dd Binary files /dev/null and b/datafusion-examples/test_csv/5idJzooCjySkTpLf_0.csv differ diff --git a/datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv b/datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv new file mode 100644 index 0000000000000..7908fb2e6579f --- /dev/null +++ b/datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv @@ -0,0 +1,4 @@ +tablecol1 +a +b +c diff --git a/datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json b/datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json new file mode 100644 index 0000000000000..58c2306646888 --- /dev/null +++ b/datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json @@ -0,0 +1,3 @@ +{"tablecol1":"a"} +{"tablecol1":"b"} +{"tablecol1":"c"} diff --git a/datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json b/datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json new file mode 100644 index 0000000000000..58c2306646888 --- /dev/null +++ b/datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json @@ -0,0 +1,3 @@ +{"tablecol1":"a"} +{"tablecol1":"b"} +{"tablecol1":"c"} diff --git a/datafusion-examples/test_parquet/YTByOUmxGAgr4Z8t_0.parquet b/datafusion-examples/test_parquet/YTByOUmxGAgr4Z8t_0.parquet new file mode 100644 index 0000000000000..f5638ede43618 Binary files /dev/null and b/datafusion-examples/test_parquet/YTByOUmxGAgr4Z8t_0.parquet differ diff --git a/datafusion-examples/test_parquet/e9tydLQCWeJ8ul8D_0.parquet b/datafusion-examples/test_parquet/e9tydLQCWeJ8ul8D_0.parquet new file mode 100644 index 0000000000000..f5638ede43618 Binary files /dev/null and b/datafusion-examples/test_parquet/e9tydLQCWeJ8ul8D_0.parquet differ diff --git a/datafusion-examples/test_table/EkdN1r77DpYeMh3w_0.parquet b/datafusion-examples/test_table/EkdN1r77DpYeMh3w_0.parquet new file mode 100644 index 0000000000000..f5638ede43618 Binary files /dev/null and b/datafusion-examples/test_table/EkdN1r77DpYeMh3w_0.parquet differ diff --git a/datafusion-examples/test_table/oUn5SdwSqetWEpqU_0.parquet b/datafusion-examples/test_table/oUn5SdwSqetWEpqU_0.parquet new file mode 100644 index 0000000000000..f5638ede43618 Binary files /dev/null and b/datafusion-examples/test_table/oUn5SdwSqetWEpqU_0.parquet differ diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index 97362bdad3ccc..7bc598806612f 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -18,9 +18,11 @@ //! File type abstraction use crate::error::{DataFusionError, Result}; +use crate::parsers::CompressionTypeVariant; use core::fmt; use std::fmt::Display; +use std::hash::Hasher; use std::str::FromStr; /// The default file extension of arrow files @@ -40,93 +42,39 @@ pub trait GetExt { fn get_ext(&self) -> String; } -/// Readable file type -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum FileType { - /// Apache Arrow file - ARROW, - /// Apache Avro file - AVRO, - /// Apache Parquet file - #[cfg(feature = "parquet")] - PARQUET, - /// CSV file - CSV, - /// JSON file - JSON, -} +/// A trait which provides information during planning time about a type of file which may be defined +/// externally. Use SessionContext::register_file_type to add new implementations. +pub trait FileType: + std::fmt::Debug + FileTypeClone + Send + Sync +{ + /// Returns the default file extension for this type, e.g. CSV would return ".csv".to_owned() + /// The default_extension is also used to uniquely identify a specific FileType::Extension variant, + /// so ensure this String is unique from any built in FileType and any other ExtensionFileTypes + /// defined. + fn default_extension(&self) -> String; -impl GetExt for FileType { - fn get_ext(&self) -> String { - match self { - FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), - FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), - #[cfg(feature = "parquet")] - FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), - FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), - FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), - } - } + /// Returns the file extension when it is compressed with a given [CompressionTypeVariant] + fn extension_with_compression( + &self, + compression: CompressionTypeVariant, + ) -> Result; } -impl Display for FileType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let out = match self { - FileType::CSV => "csv", - FileType::JSON => "json", - #[cfg(feature = "parquet")] - FileType::PARQUET => "parquet", - FileType::AVRO => "avro", - FileType::ARROW => "arrow", - }; - write!(f, "{}", out) - } +pub trait FileTypeClone { + fn clone_box(&self) -> Box; } -impl FromStr for FileType { - type Err = DataFusionError; - - fn from_str(s: &str) -> Result { - let s = s.to_uppercase(); - match s.as_str() { - "ARROW" => Ok(FileType::ARROW), - "AVRO" => Ok(FileType::AVRO), - #[cfg(feature = "parquet")] - "PARQUET" => Ok(FileType::PARQUET), - "CSV" => Ok(FileType::CSV), - "JSON" | "NDJSON" => Ok(FileType::JSON), - _ => Err(DataFusionError::NotImplemented(format!( - "Unknown FileType: {s}" - ))), - } +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() } } -#[cfg(test)] -#[cfg(feature = "parquet")] -mod tests { - use crate::error::DataFusionError; - use crate::file_options::FileType; - use std::str::FromStr; - - #[test] - fn from_str() { - for (ext, file_type) in [ - ("csv", FileType::CSV), - ("CSV", FileType::CSV), - ("json", FileType::JSON), - ("JSON", FileType::JSON), - ("avro", FileType::AVRO), - ("AVRO", FileType::AVRO), - ("parquet", FileType::PARQUET), - ("PARQUET", FileType::PARQUET), - ] { - assert_eq!(FileType::from_str(ext).unwrap(), file_type); - } - - assert!(matches!( - FileType::from_str("Unknown"), - Err(DataFusionError::NotImplemented(_)) - )); +impl std::hash::Hash for Box { + fn hash(&self, state: &mut H) + where + H: Hasher, + { + self.default_extension().hash(state) } } diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 1d661b17eb1c0..a5ab20c517aff 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -97,37 +97,6 @@ impl StatementOptions { maybe_option.map(|(_, v)| v) } - /// Infers the file_type given a target and arbitrary options. - /// If the options contain an explicit "format" option, that will be used. - /// Otherwise, attempt to infer file_type from the extension of target. - /// Finally, return an error if unable to determine the file_type - /// If found, format is removed from the options list. - pub fn try_infer_file_type(&mut self, target: &str) -> Result { - let explicit_format = self.scan_and_remove_option("format"); - let format = match explicit_format { - Some(s) => FileType::from_str(s.1.as_str()), - None => { - // try to infer file format from file extension - let extension: &str = &Path::new(target) - .extension() - .ok_or(DataFusionError::Configuration( - "Format not explicitly set and unable to get file extension!" - .to_string(), - ))? - .to_str() - .ok_or(DataFusionError::Configuration( - "Format not explicitly set and failed to parse file extension!" - .to_string(), - ))? - .to_lowercase(); - - FileType::from_str(extension) - } - }?; - - Ok(format) - } - /// Finds an option in StatementOptions if exists, removes and returns it /// along with the vec of remaining options. fn scan_and_remove_option(&mut self, find: &str) -> Option { @@ -142,158 +111,6 @@ impl StatementOptions { } } -/// This type contains all options needed to initialize a particular -/// RecordBatchWriter type. Each element in the enum contains a thin wrapper -/// around a "writer builder" type (e.g. arrow::csv::WriterBuilder) -/// plus any DataFusion specific writing options (e.g. CSV compression) -#[derive(Clone, Debug)] -pub enum FileTypeWriterOptions { - #[cfg(feature = "parquet")] - Parquet(ParquetWriterOptions), - CSV(CsvWriterOptions), - JSON(JsonWriterOptions), - Avro(AvroWriterOptions), - Arrow(ArrowWriterOptions), -} - -impl FileTypeWriterOptions { - /// Constructs a FileTypeWriterOptions given a FileType to be written - /// and arbitrary String tuple options. May return an error if any - /// string setting is unrecognized or unsupported. - pub fn build( - file_type: &FileType, - config_defaults: &ConfigOptions, - statement_options: &StatementOptions, - ) -> Result { - let options = (config_defaults, statement_options); - - let file_type_write_options = match file_type { - #[cfg(feature = "parquet")] - FileType::PARQUET => { - FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) - } - FileType::CSV => { - FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) - } - FileType::JSON => { - FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) - } - FileType::AVRO => { - FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) - } - FileType::ARROW => { - FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) - } - }; - - Ok(file_type_write_options) - } - - /// Constructs a FileTypeWriterOptions from session defaults only. - pub fn build_default( - file_type: &FileType, - config_defaults: &ConfigOptions, - ) -> Result { - let empty_statement = StatementOptions::new(vec![]); - let options = (config_defaults, &empty_statement); - - let file_type_write_options = match file_type { - #[cfg(feature = "parquet")] - FileType::PARQUET => { - FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) - } - FileType::CSV => { - FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) - } - FileType::JSON => { - FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) - } - FileType::AVRO => { - FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) - } - FileType::ARROW => { - FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) - } - }; - - Ok(file_type_write_options) - } - - /// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from parquet is set. - #[cfg(feature = "parquet")] - pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { - match self { - FileTypeWriterOptions::Parquet(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected parquet options but found options for: {}", - self - ))), - } - } - - /// Tries to extract CsvWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from csv is set. - pub fn try_into_csv(&self) -> Result<&CsvWriterOptions> { - match self { - FileTypeWriterOptions::CSV(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected csv options but found options for {}", - self - ))), - } - } - - /// Tries to extract JsonWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from json is set. - pub fn try_into_json(&self) -> Result<&JsonWriterOptions> { - match self { - FileTypeWriterOptions::JSON(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected json options but found options for {}", - self, - ))), - } - } - - /// Tries to extract AvroWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from avro is set. - pub fn try_into_avro(&self) -> Result<&AvroWriterOptions> { - match self { - FileTypeWriterOptions::Avro(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected avro options but found options for {}!", - self - ))), - } - } - - /// Tries to extract ArrowWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from arrow is set. - pub fn try_into_arrow(&self) -> Result<&ArrowWriterOptions> { - match self { - FileTypeWriterOptions::Arrow(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected arrow options but found options for {}", - self - ))), - } - } -} - -impl Display for FileTypeWriterOptions { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let name = match self { - FileTypeWriterOptions::Arrow(_) => "ArrowWriterOptions", - FileTypeWriterOptions::Avro(_) => "AvroWriterOptions", - FileTypeWriterOptions::CSV(_) => "CsvWriterOptions", - FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", - #[cfg(feature = "parquet")] - FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", - }; - write!(f, "{}", name) - } -} #[cfg(test)] #[cfg(feature = "parquet")] diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index ed547782e4a5e..fbda447accb96 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -54,7 +54,6 @@ pub use file_options::file_type::{ FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION, }; -pub use file_options::FileTypeWriterOptions; pub use functional_dependencies::{ aggregate_functional_dependencies, get_required_group_by_exprs_indices, get_target_functional_dependencies, Constraint, Constraints, Dependency, diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 3c3bcd497b7f4..55ab279957dcc 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -50,10 +50,9 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - Column, DFSchema, DataFusionError, FileType, FileTypeWriterOptions, ParamValues, + Column, DFSchema, DataFusionError, FileType, ParamValues, SchemaError, UnnestOptions, }; -use datafusion_expr::dml::CopyOptions; use datafusion_expr::{ avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 650f8c844eda3..60360e7fd4d1b 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -140,10 +140,6 @@ impl FileFormat for ArrowFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::ARROW - } } /// Implements [`DataSink`] for writing to arrow_ipc files diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 6d424bf0b28f3..827ec20d15aa5 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -90,9 +90,6 @@ impl FileFormat for AvroFormat { Ok(Arc::new(exec)) } - fn file_type(&self) -> FileType { - FileType::AVRO - } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 4033bcd3b5572..f4b645995837b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -285,9 +285,6 @@ impl FileFormat for CsvFormat { )) as _) } - fn file_type(&self) -> FileType { - FileType::CSV - } } impl CsvFormat { diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index 3dac7c293050c..b2710d248c796 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -231,29 +231,6 @@ pub trait FileTypeExt { fn get_ext_with_compression(&self, c: FileCompressionType) -> Result; } -impl FileTypeExt for FileType { - fn get_ext_with_compression(&self, c: FileCompressionType) -> Result { - let ext = self.get_ext(); - - match self { - FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), - FileType::AVRO | FileType::ARROW => match c.variant { - UNCOMPRESSED => Ok(ext), - _ => Err(DataFusionError::Internal( - "FileCompressionType can be specified for CSV/JSON FileType.".into(), - )), - }, - #[cfg(feature = "parquet")] - FileType::PARQUET => match c.variant { - UNCOMPRESSED => Ok(ext), - _ => Err(DataFusionError::Internal( - "FileCompressionType can be specified for CSV/JSON FileType.".into(), - )), - }, - } - } -} - #[cfg(test)] mod tests { use crate::datasource::file_format::file_compression_type::{ diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index fcb1d5f8e5276..5452c56f79d44 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -188,10 +188,6 @@ impl FileFormat for JsonFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::JSON - } } impl Default for JsonSerializer { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 12c9fb91adb1a..43f4935bf8b2a 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -105,8 +105,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug { not_impl_err!("Writer not implemented for this format") } - /// Returns the FileType corresponding to this FileFormat - fn file_type(&self) -> FileType; } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7044acccd6dce..1e9bc70d63271 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -291,9 +291,6 @@ impl FileFormat for ParquetFormat { )) as _) } - fn file_type(&self) -> FileType { - FileType::PARQUET - } } fn summarize_min_max( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a7af1bf1be28a..d83cadf98b1be 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -150,6 +150,9 @@ impl ListingTableConfig { ), #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::Extension(_) => { + unreachable!("FileType::from_str cannot return Extension variant!") + } }; Ok((file_format, ext)) @@ -768,7 +771,7 @@ impl TableProvider for ListingTable { let file_type_writer_options = match &self.options().file_type_write_options { Some(opt) => opt.clone(), None => FileTypeWriterOptions::build_default( - &file_format.file_type(), + &mut file_format.file_type(), state.config_options(), )?, }; @@ -1716,6 +1719,9 @@ mod tests { ) .await?; } + FileType::Extension(_) => { + panic!("Extension file type not implemented in write path.") + } } // Create and register the source table with the provided schema and inserted data diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index e8ffece320d7d..27476bedf727d 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -35,7 +35,7 @@ use crate::datasource::TableProvider; use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions}; +use datafusion_common::file_options::{StatementOptions}; use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, FileType}; use datafusion_expr::CreateExternalTable; @@ -88,6 +88,9 @@ impl TableProviderFactory for ListingTableFactory { JsonFormat::default().with_file_compression_type(file_compression_type), ), FileType::ARROW => Arc::new(ArrowFormat), + FileType::Extension(_) => { + unreachable!("FileType::from_str cannot return Extension variant!") + } }; let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { @@ -181,6 +184,9 @@ impl TableProviderFactory for ListingTableFactory { FileType::PARQUET => file_type_writer_options, FileType::ARROW => file_type_writer_options, FileType::AVRO => file_type_writer_options, + FileType::Extension(_) => { + unreachable!("FileType::from_str cannot return Extension variant!") + } }; let table_path = ListingTableUrl::parse(&cmd.location)?; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 5583991355c6a..f2b5e766a66eb 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -66,7 +66,7 @@ use arrow::{ datatypes::{DataType, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; -use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; +use datafusion_common::{plan_err}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::ExecutionPlan; @@ -95,8 +95,8 @@ pub struct FileSinkConfig { pub single_file_output: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, - /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size - pub file_type_writer_options: FileTypeWriterOptions, + /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size. + pub file_type_writer_options: StatementOptions, } impl FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 31d50be10f70a..4a2a14357e61d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -605,6 +605,7 @@ impl DefaultPhysicalPlanner { FileType::JSON => Arc::new(JsonFormat::default()), FileType::AVRO => Arc::new(AvroFormat {} ), FileType::ARROW => Arc::new(ArrowFormat {}), + FileType::Extension(_ext) => return not_impl_err!("Extension FileTypes not supported in Copy To statements.") }; sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 549c25f89bae6..ca04fc1c5bb43 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -24,7 +24,7 @@ use std::convert::TryFrom; use std::iter::zip; use std::sync::Arc; -use crate::dml::{CopyOptions, CopyTo}; +use crate::dml::{CopyTo}; use crate::expr::Alias; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, @@ -49,6 +49,7 @@ use crate::{ use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::file_options::StatementOptions; use datafusion_common::{ get_target_functional_dependencies, plan_datafusion_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, FileType, OwnedTableReference, Result, @@ -238,14 +239,12 @@ impl LogicalPlanBuilder { pub fn copy_to( input: LogicalPlan, output_url: String, - file_format: FileType, single_file_output: bool, - copy_options: CopyOptions, + copy_options: StatementOptions, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, - file_format, single_file_output, copy_options, }))) diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 4cd56b89ac635..a781bed2bd8dd 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion_common::{ - file_options::StatementOptions, DFSchemaRef, FileType, FileTypeWriterOptions, + file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference, }; @@ -34,55 +34,14 @@ pub struct CopyTo { pub input: Arc, /// The location to write the file(s) pub output_url: String, - /// The file format to output (explicitly defined or inferred from file extension) - pub file_format: FileType, /// If false, it is assumed output_url is a file to which all data should be written /// regardless of input partitioning. Otherwise, output_url is assumed to be a directory /// to which each output partition is written to its own output file pub single_file_output: bool, /// Arbitrary options as tuples - pub copy_options: CopyOptions, + pub copy_options: StatementOptions, } -/// When the logical plan is constructed from SQL, CopyOptions -/// will contain arbitrary string tuples which must be parsed into -/// FileTypeWriterOptions. When the logical plan is constructed directly -/// from rust code (such as via the DataFrame API), FileTypeWriterOptions -/// can be provided directly, avoiding the run time cost and fallibility of -/// parsing string based options. -#[derive(Clone)] -pub enum CopyOptions { - /// Holds StatementOptions parsed from a SQL statement - SQLOptions(StatementOptions), - /// Holds FileTypeWriterOptions directly provided - WriterOptions(Box), -} - -impl PartialEq for CopyOptions { - fn eq(&self, other: &CopyOptions) -> bool { - match self { - Self::SQLOptions(statement1) => match other { - Self::SQLOptions(statement2) => statement1.eq(statement2), - Self::WriterOptions(_) => false, - }, - Self::WriterOptions(_) => false, - } - } -} - -impl Eq for CopyOptions {} - -impl std::hash::Hash for CopyOptions { - fn hash(&self, hasher: &mut H) - where - H: std::hash::Hasher, - { - match self { - Self::SQLOptions(statement) => statement.hash(hasher), - Self::WriterOptions(_) => (), - } - } -} /// The operator that modifies the content of a database (adapted from /// substrait WriteRel) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9b0f441ef9026..088408fb54007 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use super::dml::CopyTo; use super::DdlStatement; -use crate::dml::CopyOptions; use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr}; use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; @@ -621,13 +620,11 @@ impl LogicalPlan { LogicalPlan::Copy(CopyTo { input: _, output_url, - file_format, copy_options, single_file_output, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs[0].clone()), output_url: output_url.clone(), - file_format: file_format.clone(), single_file_output: *single_file_output, copy_options: copy_options.clone(), })), @@ -1552,22 +1549,18 @@ impl LogicalPlan { LogicalPlan::Copy(CopyTo { input: _, output_url, - file_format, single_file_output, copy_options, }) => { - let op_str = match copy_options { - CopyOptions::SQLOptions(statement) => statement - .clone() - .into_inner() - .iter() - .map(|(k, v)| format!("{k} {v}")) - .collect::>() - .join(", "), - CopyOptions::WriterOptions(_) => "".into(), - }; - - write!(f, "CopyTo: format={file_format} output_url={output_url} single_file_output={single_file_output} options: ({op_str})") + let op_str = copy_options + .clone() + .into_inner() + .iter() + .map(|(k, v)| format!("{k} {v}")) + .collect::>() + .join(", "); + + write!(f, "CopyTo: output_url={output_url} single_file_output={single_file_output} options: ({op_str})") } LogicalPlan::Ddl(ddl) => { write!(f, "{}", ddl.display()) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index e9cdb34cf1b9a..203f8380776a0 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -891,6 +891,9 @@ impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions { FileTypeWriterOptions::Arrow(ArrowWriterOptions {}) => { return not_impl_err!("Arrow file sink protobuf serialization") } + FileTypeWriterOptions::Extension(_) => { + return not_impl_err!("Extension file sink protobuf serialization") + } }; Ok(Self { file_type: Some(file_type), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 12083554f0932..d2127d77c88ca 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -35,7 +35,7 @@ use datafusion_common::{ Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue, SchemaReference, TableReference, ToDFSchema, }; -use datafusion_expr::dml::{CopyOptions, CopyTo}; +use datafusion_expr::dml::{CopyTo}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; use datafusion_expr::logical_plan::DdlStatement; @@ -675,20 +675,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|(s, v)| (s.to_owned(), v.to_string())) .collect::>(); - let mut statement_options = StatementOptions::new(options); - let file_format = statement_options.try_infer_file_type(&statement.target)?; + let mut copy_options = StatementOptions::new(options); let single_file_output = - statement_options.take_bool_option("single_file_output")?; + copy_options.take_bool_option("single_file_output")?; // COPY defaults to outputting a single file if not otherwise specified let single_file_output = single_file_output.unwrap_or(true); - let copy_options = CopyOptions::SQLOptions(statement_options); Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url: statement.target, - file_format, single_file_output, copy_options, }))