From 9dce12f4045ac55dccd105b48e76a29fa74b3a24 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 28 Dec 2023 10:27:13 -0500 Subject: [PATCH 1/5] squash --- .../common/src/file_options/file_type.rs | 60 ++++++++++++++++++- datafusion/common/src/file_options/mod.rs | 8 ++- .../file_format/file_compression_type.rs | 1 + .../core/src/datasource/listing/table.rs | 8 ++- .../src/datasource/listing_table_factory.rs | 6 ++ datafusion/core/src/physical_planner.rs | 1 + .../proto/src/physical_plan/to_proto.rs | 3 + 7 files changed, 84 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index 97362bdad3ccc..1d12510cdf73a 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -18,9 +18,11 @@ //! File type abstraction use crate::error::{DataFusionError, Result}; +use crate::parsers::CompressionTypeVariant; use core::fmt; use std::fmt::Display; +use std::hash::Hasher; use std::str::FromStr; /// The default file extension of arrow files @@ -41,7 +43,7 @@ pub trait GetExt { } /// Readable file type -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Hash)] pub enum FileType { /// Apache Arrow file ARROW, @@ -54,8 +56,62 @@ pub enum FileType { CSV, /// JSON file JSON, + /// FileType Implemented Outside of DataFusion + Extension(Box), } +/// A trait to enable externally implementing the functionality of a [FileType]. +pub trait ExtensionFileType: + std::fmt::Debug + ExtensionFileTypeClone + Send + Sync +{ + /// Returns the default file extension for this type, e.g. CSV would return ".csv".to_owned() + fn default_extension(&self) -> String; + + /// Returns the file extension when it is compressed with a given [FileCompressionType] + fn extension_with_compression( + &self, + compression: CompressionTypeVariant, + ) -> Result; +} + +pub trait ExtensionFileTypeClone { + fn clone_box(&self) -> Box; +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +impl std::hash::Hash for Box { + fn hash(&self, state: &mut H) + where + H: Hasher, + { + self.default_extension().hash(state) + } +} + +impl PartialEq for FileType { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (FileType::ARROW, FileType::ARROW) => true, + (FileType::AVRO, FileType::AVRO) => true, + #[cfg(feature = "parquet")] + (FileType::PARQUET, FileType::PARQUET) => true, + (FileType::CSV, FileType::CSV) => true, + (FileType::JSON, FileType::JSON) => true, + (FileType::Extension(ext_self), FileType::Extension(ext_other)) => { + ext_self.default_extension() == ext_other.default_extension() + } + _ => false, + } + } +} + +impl Eq for FileType {} + impl GetExt for FileType { fn get_ext(&self) -> String { match self { @@ -65,6 +121,7 @@ impl GetExt for FileType { FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), + FileType::Extension(ext) => ext.default_extension(), } } } @@ -78,6 +135,7 @@ impl Display for FileType { FileType::PARQUET => "parquet", FileType::AVRO => "avro", FileType::ARROW => "arrow", + FileType::Extension(ext) => return ext.fmt(f), }; write!(f, "{}", out) } diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 1d661b17eb1c0..c1066b0b1e9ef 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -154,6 +154,9 @@ pub enum FileTypeWriterOptions { JSON(JsonWriterOptions), Avro(AvroWriterOptions), Arrow(ArrowWriterOptions), + /// For extension [FileType]s, FileTypeWriterOptions ignores all + /// passed options and returns an empty variant. + Extension, } impl FileTypeWriterOptions { @@ -184,6 +187,7 @@ impl FileTypeWriterOptions { FileType::ARROW => { FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) } + FileType::Extension(_) => FileTypeWriterOptions::Extension, }; Ok(file_type_write_options) @@ -191,7 +195,7 @@ impl FileTypeWriterOptions { /// Constructs a FileTypeWriterOptions from session defaults only. pub fn build_default( - file_type: &FileType, + file_type: &mut FileType, config_defaults: &ConfigOptions, ) -> Result { let empty_statement = StatementOptions::new(vec![]); @@ -214,6 +218,7 @@ impl FileTypeWriterOptions { FileType::ARROW => { FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) } + FileType::Extension(_) => FileTypeWriterOptions::Extension, }; Ok(file_type_write_options) @@ -290,6 +295,7 @@ impl Display for FileTypeWriterOptions { FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", #[cfg(feature = "parquet")] FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", + FileTypeWriterOptions::Extension => "ExensionWriterOptions", }; write!(f, "{}", name) } diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index 3dac7c293050c..4340298bed1b0 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -250,6 +250,7 @@ impl FileTypeExt for FileType { "FileCompressionType can be specified for CSV/JSON FileType.".into(), )), }, + FileType::Extension(ext) => ext.extension_with_compression(c.variant), } } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a7af1bf1be28a..d83cadf98b1be 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -150,6 +150,9 @@ impl ListingTableConfig { ), #[cfg(feature = "parquet")] FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::Extension(_) => { + unreachable!("FileType::from_str cannot return Extension variant!") + } }; Ok((file_format, ext)) @@ -768,7 +771,7 @@ impl TableProvider for ListingTable { let file_type_writer_options = match &self.options().file_type_write_options { Some(opt) => opt.clone(), None => FileTypeWriterOptions::build_default( - &file_format.file_type(), + &mut file_format.file_type(), state.config_options(), )?, }; @@ -1716,6 +1719,9 @@ mod tests { ) .await?; } + FileType::Extension(_) => { + panic!("Extension file type not implemented in write path.") + } } // Create and register the source table with the provided schema and inserted data diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index e8ffece320d7d..5d73bc4946092 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -88,6 +88,9 @@ impl TableProviderFactory for ListingTableFactory { JsonFormat::default().with_file_compression_type(file_compression_type), ), FileType::ARROW => Arc::new(ArrowFormat), + FileType::Extension(_) => { + unreachable!("FileType::from_str cannot return Extension variant!") + } }; let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { @@ -181,6 +184,9 @@ impl TableProviderFactory for ListingTableFactory { FileType::PARQUET => file_type_writer_options, FileType::ARROW => file_type_writer_options, FileType::AVRO => file_type_writer_options, + FileType::Extension(_) => { + unreachable!("FileType::from_str cannot return Extension variant!") + } }; let table_path = ListingTableUrl::parse(&cmd.location)?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 31d50be10f70a..4a2a14357e61d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -605,6 +605,7 @@ impl DefaultPhysicalPlanner { FileType::JSON => Arc::new(JsonFormat::default()), FileType::AVRO => Arc::new(AvroFormat {} ), FileType::ARROW => Arc::new(ArrowFormat {}), + FileType::Extension(_ext) => return not_impl_err!("Extension FileTypes not supported in Copy To statements.") }; sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index e9cdb34cf1b9a..7318984992397 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -891,6 +891,9 @@ impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions { FileTypeWriterOptions::Arrow(ArrowWriterOptions {}) => { return not_impl_err!("Arrow file sink protobuf serialization") } + FileTypeWriterOptions::Extension => { + return not_impl_err!("Extension file sink protobuf serialization") + } }; Ok(Self { file_type: Some(file_type), From b93f6f8111171a88161fc62a069c5708cce6ebe6 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 28 Dec 2023 10:53:28 -0500 Subject: [PATCH 2/5] pass through StatementOptions --- datafusion/common/src/file_options/mod.rs | 15 +++++++++------ .../core/src/datasource/physical_plan/mod.rs | 4 +++- datafusion/proto/src/physical_plan/to_proto.rs | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index c1066b0b1e9ef..4c045551f6b65 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -154,9 +154,10 @@ pub enum FileTypeWriterOptions { JSON(JsonWriterOptions), Avro(AvroWriterOptions), Arrow(ArrowWriterOptions), - /// For extension [FileType]s, FileTypeWriterOptions ignores all - /// passed options and returns an empty variant. - Extension, + /// For extension [FileType]s, FileTypeWriterOptions simply stores + /// any passed StatementOptions to be handled later by any custom + /// physical plans (Such as a FileFormat::create_writer_physical_plan) + Extension(Option), } impl FileTypeWriterOptions { @@ -187,7 +188,9 @@ impl FileTypeWriterOptions { FileType::ARROW => { FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) } - FileType::Extension(_) => FileTypeWriterOptions::Extension, + FileType::Extension(_) => { + FileTypeWriterOptions::Extension(Some(statement_options.clone())) + } }; Ok(file_type_write_options) @@ -218,7 +221,7 @@ impl FileTypeWriterOptions { FileType::ARROW => { FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) } - FileType::Extension(_) => FileTypeWriterOptions::Extension, + FileType::Extension(_) => FileTypeWriterOptions::Extension(None), }; Ok(file_type_write_options) @@ -295,7 +298,7 @@ impl Display for FileTypeWriterOptions { FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", #[cfg(feature = "parquet")] FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", - FileTypeWriterOptions::Extension => "ExensionWriterOptions", + FileTypeWriterOptions::Extension(_) => "ExensionWriterOptions", }; write!(f, "{}", name) } diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 5583991355c6a..312183588c456 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -95,7 +95,9 @@ pub struct FileSinkConfig { pub single_file_output: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, - /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size + /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size. + /// Note that for externally defined FileTypes, FileTypeWriterOptions contains arbitrary + /// config tuples that must be handled within the physical plan. pub file_type_writer_options: FileTypeWriterOptions, } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7318984992397..203f8380776a0 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -891,7 +891,7 @@ impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions { FileTypeWriterOptions::Arrow(ArrowWriterOptions {}) => { return not_impl_err!("Arrow file sink protobuf serialization") } - FileTypeWriterOptions::Extension => { + FileTypeWriterOptions::Extension(_) => { return not_impl_err!("Extension file sink protobuf serialization") } }; From 4783a1a025779662277c4584ea1fb8d2a2f089cf Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 28 Dec 2023 11:00:03 -0500 Subject: [PATCH 3/5] lint and doc update --- datafusion/common/src/file_options/file_type.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index 1d12510cdf73a..8af1ecef9cf3b 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -43,6 +43,7 @@ pub trait GetExt { } /// Readable file type +#[allow(clippy::derived_hash_with_manual_eq)] #[derive(Debug, Clone, Hash)] pub enum FileType { /// Apache Arrow file @@ -65,9 +66,12 @@ pub trait ExtensionFileType: std::fmt::Debug + ExtensionFileTypeClone + Send + Sync { /// Returns the default file extension for this type, e.g. CSV would return ".csv".to_owned() + /// The default_extension is also used to uniquely identify a specific FileType::Extension variant, + /// so ensure this String is unique from any built in FileType and any other ExtensionFileTypes + /// defined. fn default_extension(&self) -> String; - /// Returns the file extension when it is compressed with a given [FileCompressionType] + /// Returns the file extension when it is compressed with a given [CompressionTypeVariant] fn extension_with_compression( &self, compression: CompressionTypeVariant, From 0559aa9d0d305f5f677b8c4b2c14c1177a7692d8 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 28 Dec 2023 11:14:52 -0500 Subject: [PATCH 4/5] try bump test stack size --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a541091e3a2b2..6a14c0ee8b48d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -98,7 +98,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --lib --tests --bins --features avro,json,backtrace + run: RUST_MIN_STACK=504857600 cargo test --lib --tests --bins --features avro,json,backtrace - name: Verify Working Directory Clean run: git diff --exit-code From 4ad7f6e347275e3c0d365485d15b0ac207bb27ef Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sat, 6 Jan 2024 17:08:30 -0500 Subject: [PATCH 5/5] starting to rip out enums... --- datafusion-examples/examples/bench.py | 88 ++++++++ .../test_csv/5idJzooCjySkTpLf_0.csv | Bin 0 -> 36 bytes .../test_csv/HGQahh0J2jquvQ6M_0.csv | 4 + .../test_json/Ml2qxq3EGuEpgtz6_0.json | 3 + .../test_json/i87q26cL7eYNxxnQ_0.json | 3 + .../test_parquet/YTByOUmxGAgr4Z8t_0.parquet | Bin 0 -> 310 bytes .../test_parquet/e9tydLQCWeJ8ul8D_0.parquet | Bin 0 -> 310 bytes .../test_table/EkdN1r77DpYeMh3w_0.parquet | Bin 0 -> 310 bytes .../test_table/oUn5SdwSqetWEpqU_0.parquet | Bin 0 -> 310 bytes .../common/src/file_options/file_type.rs | 132 +----------- datafusion/common/src/file_options/mod.rs | 192 ------------------ datafusion/common/src/lib.rs | 1 - datafusion/core/src/dataframe/mod.rs | 3 +- .../core/src/datasource/file_format/arrow.rs | 4 - .../core/src/datasource/file_format/avro.rs | 3 - .../core/src/datasource/file_format/csv.rs | 3 - .../file_format/file_compression_type.rs | 24 --- .../core/src/datasource/file_format/json.rs | 4 - .../core/src/datasource/file_format/mod.rs | 2 - .../src/datasource/file_format/parquet.rs | 3 - .../src/datasource/listing_table_factory.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 6 +- datafusion/expr/src/logical_plan/builder.rs | 7 +- datafusion/expr/src/logical_plan/dml.rs | 45 +--- datafusion/expr/src/logical_plan/plan.rs | 25 +-- datafusion/sql/src/statement.rs | 9 +- 26 files changed, 128 insertions(+), 435 deletions(-) create mode 100644 datafusion-examples/examples/bench.py create mode 100644 datafusion-examples/test_csv/5idJzooCjySkTpLf_0.csv create mode 100644 datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv create mode 100644 datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json create mode 100644 datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json create mode 100644 datafusion-examples/test_parquet/YTByOUmxGAgr4Z8t_0.parquet create mode 100644 datafusion-examples/test_parquet/e9tydLQCWeJ8ul8D_0.parquet create mode 100644 datafusion-examples/test_table/EkdN1r77DpYeMh3w_0.parquet create mode 100644 datafusion-examples/test_table/oUn5SdwSqetWEpqU_0.parquet diff --git a/datafusion-examples/examples/bench.py b/datafusion-examples/examples/bench.py new file mode 100644 index 0000000000000..f9f8e2e7c57eb --- /dev/null +++ b/datafusion-examples/examples/bench.py @@ -0,0 +1,88 @@ +import polars as pl +import time +from datetime import date +from datafusion import SessionContext + +t = time.time() + +#file = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet" + + +file = "/home/dev/arrow-datafusion/test_out/benchon.parquet" +#file = "/home/dev/arrow-datafusion/test_out/uncompressed.parquet" + + +# Create a DataFusion context +ctx = SessionContext() + +# Register table with context +ctx.register_parquet('test', file) + +times = [] +for i in range(5): + + t = time.time() + df = pl.scan_parquet(file) \ + .filter(pl.col("l_shipdate") <= date(1998, 9, 2)) \ + .group_by("l_returnflag", "l_linestatus") \ + .agg([ + pl.col("l_quantity").sum().alias("sum_qty"), + pl.col("l_extendedprice").sum().alias("sum_base_price"), + (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).sum().alias("sum_disc_price"), + (pl.col("l_extendedprice") * (1 - pl.col("l_discount")) * (1 + pl.col("l_tax"))).sum().alias("sum_charge"), + pl.col("l_quantity").mean().alias("avg_qty"), + pl.col("l_extendedprice").mean().alias("avg_price"), + pl.col("l_discount").mean().alias("avg_disc"), + pl.count().alias("count_order") + ] + ) \ + .sort([pl.col("l_returnflag"), pl.col("l_linestatus")]) + df = df.collect() + + print(f"polars agg query {time.time()-t}s") + + #t = time.time() + #pl.scan_parquet(file).sink_parquet("test_out/pl.parquet") + #print(f"polars re-endcode job {time.time()-t}") + + + t = time.time() + + + query = """ + select + l_returnflag, + l_linestatus, + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order +from + test +where + l_shipdate <= date '1998-09-02' +group by + l_returnflag, + l_linestatus +order by + l_returnflag, + l_linestatus; + """ + + # Execute SQL + df = ctx.sql(f"{query}").cache() + elapsed = time.time() - t + times.append(elapsed) + print(f"datafusion agg query {elapsed}s") + + # t = time.time() + # df = ctx.sql("copy test to 'test_out/df.parquet'") + # df.show() + # print(f"datafusion reendcode job {time.time() - t}s") + +print(sum(times)/len(times)) + diff --git a/datafusion-examples/test_csv/5idJzooCjySkTpLf_0.csv b/datafusion-examples/test_csv/5idJzooCjySkTpLf_0.csv new file mode 100644 index 0000000000000000000000000000000000000000..f4263141197ddb2ebbfb729f9669b29f0eb24b6b GIT binary patch literal 36 ncmb2|=3oGW|Jt5Dr#yYnoi}{q^~CFu*E1%DsC3x?0iYxR&Fl+K literal 0 HcmV?d00001 diff --git a/datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv b/datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv new file mode 100644 index 0000000000000..7908fb2e6579f --- /dev/null +++ b/datafusion-examples/test_csv/HGQahh0J2jquvQ6M_0.csv @@ -0,0 +1,4 @@ +tablecol1 +a +b +c diff --git a/datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json b/datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json new file mode 100644 index 0000000000000..58c2306646888 --- /dev/null +++ b/datafusion-examples/test_json/Ml2qxq3EGuEpgtz6_0.json @@ -0,0 +1,3 @@ +{"tablecol1":"a"} +{"tablecol1":"b"} +{"tablecol1":"c"} diff --git a/datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json b/datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json new file mode 100644 index 0000000000000..58c2306646888 --- /dev/null +++ b/datafusion-examples/test_json/i87q26cL7eYNxxnQ_0.json @@ -0,0 +1,3 @@ +{"tablecol1":"a"} +{"tablecol1":"b"} +{"tablecol1":"c"} diff --git a/datafusion-examples/test_parquet/YTByOUmxGAgr4Z8t_0.parquet b/datafusion-examples/test_parquet/YTByOUmxGAgr4Z8t_0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f5638ede43618a74f296956e940842952b67e684 GIT binary patch literal 310 zcmaJ-Jxc>Y5PiGK65)y%Zk7xzB9bf?A-SOb06U8-qNKP^Hs=L}#Dhzs*yTU?4q7tpqPA5rkY~LWh!s0 zAe|lH9?()4rH?wGh;Lw;+&)=`$*joB*^nY@>{{84@~Ouj&8M1$Moa|Q0_^OVGD$$1 zdv{T&s`CADkv|vnKw6l)`hc~I)XAtd?AZBl<^ES5`^H|`erbyBTGIvUy^L?jZn2!h dPhlCRtHn$Cdj3{afAQ+NpY#)aiq=Rz_6r|HH#GnN literal 0 HcmV?d00001 diff --git a/datafusion-examples/test_parquet/e9tydLQCWeJ8ul8D_0.parquet b/datafusion-examples/test_parquet/e9tydLQCWeJ8ul8D_0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f5638ede43618a74f296956e940842952b67e684 GIT binary patch literal 310 zcmaJ-Jxc>Y5PiGK65)y%Zk7xzB9bf?A-SOb06U8-qNKP^Hs=L}#Dhzs*yTU?4q7tpqPA5rkY~LWh!s0 zAe|lH9?()4rH?wGh;Lw;+&)=`$*joB*^nY@>{{84@~Ouj&8M1$Moa|Q0_^OVGD$$1 zdv{T&s`CADkv|vnKw6l)`hc~I)XAtd?AZBl<^ES5`^H|`erbyBTGIvUy^L?jZn2!h dPhlCRtHn$Cdj3{afAQ+NpY#)aiq=Rz_6r|HH#GnN literal 0 HcmV?d00001 diff --git a/datafusion-examples/test_table/EkdN1r77DpYeMh3w_0.parquet b/datafusion-examples/test_table/EkdN1r77DpYeMh3w_0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f5638ede43618a74f296956e940842952b67e684 GIT binary patch literal 310 zcmaJ-Jxc>Y5PiGK65)y%Zk7xzB9bf?A-SOb06U8-qNKP^Hs=L}#Dhzs*yTU?4q7tpqPA5rkY~LWh!s0 zAe|lH9?()4rH?wGh;Lw;+&)=`$*joB*^nY@>{{84@~Ouj&8M1$Moa|Q0_^OVGD$$1 zdv{T&s`CADkv|vnKw6l)`hc~I)XAtd?AZBl<^ES5`^H|`erbyBTGIvUy^L?jZn2!h dPhlCRtHn$Cdj3{afAQ+NpY#)aiq=Rz_6r|HH#GnN literal 0 HcmV?d00001 diff --git a/datafusion-examples/test_table/oUn5SdwSqetWEpqU_0.parquet b/datafusion-examples/test_table/oUn5SdwSqetWEpqU_0.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f5638ede43618a74f296956e940842952b67e684 GIT binary patch literal 310 zcmaJ-Jxc>Y5PiGK65)y%Zk7xzB9bf?A-SOb06U8-qNKP^Hs=L}#Dhzs*yTU?4q7tpqPA5rkY~LWh!s0 zAe|lH9?()4rH?wGh;Lw;+&)=`$*joB*^nY@>{{84@~Ouj&8M1$Moa|Q0_^OVGD$$1 zdv{T&s`CADkv|vnKw6l)`hc~I)XAtd?AZBl<^ES5`^H|`erbyBTGIvUy^L?jZn2!h dPhlCRtHn$Cdj3{afAQ+NpY#)aiq=Rz_6r|HH#GnN literal 0 HcmV?d00001 diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index 8af1ecef9cf3b..7bc598806612f 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -42,28 +42,10 @@ pub trait GetExt { fn get_ext(&self) -> String; } -/// Readable file type -#[allow(clippy::derived_hash_with_manual_eq)] -#[derive(Debug, Clone, Hash)] -pub enum FileType { - /// Apache Arrow file - ARROW, - /// Apache Avro file - AVRO, - /// Apache Parquet file - #[cfg(feature = "parquet")] - PARQUET, - /// CSV file - CSV, - /// JSON file - JSON, - /// FileType Implemented Outside of DataFusion - Extension(Box), -} - -/// A trait to enable externally implementing the functionality of a [FileType]. -pub trait ExtensionFileType: - std::fmt::Debug + ExtensionFileTypeClone + Send + Sync +/// A trait which provides information during planning time about a type of file which may be defined +/// externally. Use SessionContext::register_file_type to add new implementations. +pub trait FileType: + std::fmt::Debug + FileTypeClone + Send + Sync { /// Returns the default file extension for this type, e.g. CSV would return ".csv".to_owned() /// The default_extension is also used to uniquely identify a specific FileType::Extension variant, @@ -78,17 +60,17 @@ pub trait ExtensionFileType: ) -> Result; } -pub trait ExtensionFileTypeClone { - fn clone_box(&self) -> Box; +pub trait FileTypeClone { + fn clone_box(&self) -> Box; } -impl Clone for Box { - fn clone(&self) -> Box { +impl Clone for Box { + fn clone(&self) -> Box { self.clone_box() } } -impl std::hash::Hash for Box { +impl std::hash::Hash for Box { fn hash(&self, state: &mut H) where H: Hasher, @@ -96,99 +78,3 @@ impl std::hash::Hash for Box { self.default_extension().hash(state) } } - -impl PartialEq for FileType { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (FileType::ARROW, FileType::ARROW) => true, - (FileType::AVRO, FileType::AVRO) => true, - #[cfg(feature = "parquet")] - (FileType::PARQUET, FileType::PARQUET) => true, - (FileType::CSV, FileType::CSV) => true, - (FileType::JSON, FileType::JSON) => true, - (FileType::Extension(ext_self), FileType::Extension(ext_other)) => { - ext_self.default_extension() == ext_other.default_extension() - } - _ => false, - } - } -} - -impl Eq for FileType {} - -impl GetExt for FileType { - fn get_ext(&self) -> String { - match self { - FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), - FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), - #[cfg(feature = "parquet")] - FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), - FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), - FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), - FileType::Extension(ext) => ext.default_extension(), - } - } -} - -impl Display for FileType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let out = match self { - FileType::CSV => "csv", - FileType::JSON => "json", - #[cfg(feature = "parquet")] - FileType::PARQUET => "parquet", - FileType::AVRO => "avro", - FileType::ARROW => "arrow", - FileType::Extension(ext) => return ext.fmt(f), - }; - write!(f, "{}", out) - } -} - -impl FromStr for FileType { - type Err = DataFusionError; - - fn from_str(s: &str) -> Result { - let s = s.to_uppercase(); - match s.as_str() { - "ARROW" => Ok(FileType::ARROW), - "AVRO" => Ok(FileType::AVRO), - #[cfg(feature = "parquet")] - "PARQUET" => Ok(FileType::PARQUET), - "CSV" => Ok(FileType::CSV), - "JSON" | "NDJSON" => Ok(FileType::JSON), - _ => Err(DataFusionError::NotImplemented(format!( - "Unknown FileType: {s}" - ))), - } - } -} - -#[cfg(test)] -#[cfg(feature = "parquet")] -mod tests { - use crate::error::DataFusionError; - use crate::file_options::FileType; - use std::str::FromStr; - - #[test] - fn from_str() { - for (ext, file_type) in [ - ("csv", FileType::CSV), - ("CSV", FileType::CSV), - ("json", FileType::JSON), - ("JSON", FileType::JSON), - ("avro", FileType::AVRO), - ("AVRO", FileType::AVRO), - ("parquet", FileType::PARQUET), - ("PARQUET", FileType::PARQUET), - ] { - assert_eq!(FileType::from_str(ext).unwrap(), file_type); - } - - assert!(matches!( - FileType::from_str("Unknown"), - Err(DataFusionError::NotImplemented(_)) - )); - } -} diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 4c045551f6b65..a5ab20c517aff 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -97,37 +97,6 @@ impl StatementOptions { maybe_option.map(|(_, v)| v) } - /// Infers the file_type given a target and arbitrary options. - /// If the options contain an explicit "format" option, that will be used. - /// Otherwise, attempt to infer file_type from the extension of target. - /// Finally, return an error if unable to determine the file_type - /// If found, format is removed from the options list. - pub fn try_infer_file_type(&mut self, target: &str) -> Result { - let explicit_format = self.scan_and_remove_option("format"); - let format = match explicit_format { - Some(s) => FileType::from_str(s.1.as_str()), - None => { - // try to infer file format from file extension - let extension: &str = &Path::new(target) - .extension() - .ok_or(DataFusionError::Configuration( - "Format not explicitly set and unable to get file extension!" - .to_string(), - ))? - .to_str() - .ok_or(DataFusionError::Configuration( - "Format not explicitly set and failed to parse file extension!" - .to_string(), - ))? - .to_lowercase(); - - FileType::from_str(extension) - } - }?; - - Ok(format) - } - /// Finds an option in StatementOptions if exists, removes and returns it /// along with the vec of remaining options. fn scan_and_remove_option(&mut self, find: &str) -> Option { @@ -142,167 +111,6 @@ impl StatementOptions { } } -/// This type contains all options needed to initialize a particular -/// RecordBatchWriter type. Each element in the enum contains a thin wrapper -/// around a "writer builder" type (e.g. arrow::csv::WriterBuilder) -/// plus any DataFusion specific writing options (e.g. CSV compression) -#[derive(Clone, Debug)] -pub enum FileTypeWriterOptions { - #[cfg(feature = "parquet")] - Parquet(ParquetWriterOptions), - CSV(CsvWriterOptions), - JSON(JsonWriterOptions), - Avro(AvroWriterOptions), - Arrow(ArrowWriterOptions), - /// For extension [FileType]s, FileTypeWriterOptions simply stores - /// any passed StatementOptions to be handled later by any custom - /// physical plans (Such as a FileFormat::create_writer_physical_plan) - Extension(Option), -} - -impl FileTypeWriterOptions { - /// Constructs a FileTypeWriterOptions given a FileType to be written - /// and arbitrary String tuple options. May return an error if any - /// string setting is unrecognized or unsupported. - pub fn build( - file_type: &FileType, - config_defaults: &ConfigOptions, - statement_options: &StatementOptions, - ) -> Result { - let options = (config_defaults, statement_options); - - let file_type_write_options = match file_type { - #[cfg(feature = "parquet")] - FileType::PARQUET => { - FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) - } - FileType::CSV => { - FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) - } - FileType::JSON => { - FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) - } - FileType::AVRO => { - FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) - } - FileType::ARROW => { - FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) - } - FileType::Extension(_) => { - FileTypeWriterOptions::Extension(Some(statement_options.clone())) - } - }; - - Ok(file_type_write_options) - } - - /// Constructs a FileTypeWriterOptions from session defaults only. - pub fn build_default( - file_type: &mut FileType, - config_defaults: &ConfigOptions, - ) -> Result { - let empty_statement = StatementOptions::new(vec![]); - let options = (config_defaults, &empty_statement); - - let file_type_write_options = match file_type { - #[cfg(feature = "parquet")] - FileType::PARQUET => { - FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) - } - FileType::CSV => { - FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) - } - FileType::JSON => { - FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) - } - FileType::AVRO => { - FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) - } - FileType::ARROW => { - FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) - } - FileType::Extension(_) => FileTypeWriterOptions::Extension(None), - }; - - Ok(file_type_write_options) - } - - /// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from parquet is set. - #[cfg(feature = "parquet")] - pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { - match self { - FileTypeWriterOptions::Parquet(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected parquet options but found options for: {}", - self - ))), - } - } - - /// Tries to extract CsvWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from csv is set. - pub fn try_into_csv(&self) -> Result<&CsvWriterOptions> { - match self { - FileTypeWriterOptions::CSV(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected csv options but found options for {}", - self - ))), - } - } - - /// Tries to extract JsonWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from json is set. - pub fn try_into_json(&self) -> Result<&JsonWriterOptions> { - match self { - FileTypeWriterOptions::JSON(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected json options but found options for {}", - self, - ))), - } - } - - /// Tries to extract AvroWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from avro is set. - pub fn try_into_avro(&self) -> Result<&AvroWriterOptions> { - match self { - FileTypeWriterOptions::Avro(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected avro options but found options for {}!", - self - ))), - } - } - - /// Tries to extract ArrowWriterOptions from this FileTypeWriterOptions enum. - /// Returns an error if a different type from arrow is set. - pub fn try_into_arrow(&self) -> Result<&ArrowWriterOptions> { - match self { - FileTypeWriterOptions::Arrow(opt) => Ok(opt), - _ => Err(DataFusionError::Internal(format!( - "Expected arrow options but found options for {}", - self - ))), - } - } -} - -impl Display for FileTypeWriterOptions { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let name = match self { - FileTypeWriterOptions::Arrow(_) => "ArrowWriterOptions", - FileTypeWriterOptions::Avro(_) => "AvroWriterOptions", - FileTypeWriterOptions::CSV(_) => "CsvWriterOptions", - FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", - #[cfg(feature = "parquet")] - FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", - FileTypeWriterOptions::Extension(_) => "ExensionWriterOptions", - }; - write!(f, "{}", name) - } -} #[cfg(test)] #[cfg(feature = "parquet")] diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index ed547782e4a5e..fbda447accb96 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -54,7 +54,6 @@ pub use file_options::file_type::{ FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION, }; -pub use file_options::FileTypeWriterOptions; pub use functional_dependencies::{ aggregate_functional_dependencies, get_required_group_by_exprs_indices, get_target_functional_dependencies, Constraint, Constraints, Dependency, diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 3c3bcd497b7f4..55ab279957dcc 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -50,10 +50,9 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - Column, DFSchema, DataFusionError, FileType, FileTypeWriterOptions, ParamValues, + Column, DFSchema, DataFusionError, FileType, ParamValues, SchemaError, UnnestOptions, }; -use datafusion_expr::dml::CopyOptions; use datafusion_expr::{ avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 650f8c844eda3..60360e7fd4d1b 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -140,10 +140,6 @@ impl FileFormat for ArrowFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::ARROW - } } /// Implements [`DataSink`] for writing to arrow_ipc files diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 6d424bf0b28f3..827ec20d15aa5 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -90,9 +90,6 @@ impl FileFormat for AvroFormat { Ok(Arc::new(exec)) } - fn file_type(&self) -> FileType { - FileType::AVRO - } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 4033bcd3b5572..f4b645995837b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -285,9 +285,6 @@ impl FileFormat for CsvFormat { )) as _) } - fn file_type(&self) -> FileType { - FileType::CSV - } } impl CsvFormat { diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index 4340298bed1b0..b2710d248c796 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -231,30 +231,6 @@ pub trait FileTypeExt { fn get_ext_with_compression(&self, c: FileCompressionType) -> Result; } -impl FileTypeExt for FileType { - fn get_ext_with_compression(&self, c: FileCompressionType) -> Result { - let ext = self.get_ext(); - - match self { - FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), - FileType::AVRO | FileType::ARROW => match c.variant { - UNCOMPRESSED => Ok(ext), - _ => Err(DataFusionError::Internal( - "FileCompressionType can be specified for CSV/JSON FileType.".into(), - )), - }, - #[cfg(feature = "parquet")] - FileType::PARQUET => match c.variant { - UNCOMPRESSED => Ok(ext), - _ => Err(DataFusionError::Internal( - "FileCompressionType can be specified for CSV/JSON FileType.".into(), - )), - }, - FileType::Extension(ext) => ext.extension_with_compression(c.variant), - } - } -} - #[cfg(test)] mod tests { use crate::datasource::file_format::file_compression_type::{ diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index fcb1d5f8e5276..5452c56f79d44 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -188,10 +188,6 @@ impl FileFormat for JsonFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::JSON - } } impl Default for JsonSerializer { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 12c9fb91adb1a..43f4935bf8b2a 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -105,8 +105,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug { not_impl_err!("Writer not implemented for this format") } - /// Returns the FileType corresponding to this FileFormat - fn file_type(&self) -> FileType; } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7044acccd6dce..1e9bc70d63271 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -291,9 +291,6 @@ impl FileFormat for ParquetFormat { )) as _) } - fn file_type(&self) -> FileType { - FileType::PARQUET - } } fn summarize_min_max( diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 5d73bc4946092..27476bedf727d 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -35,7 +35,7 @@ use crate::datasource::TableProvider; use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions}; +use datafusion_common::file_options::{StatementOptions}; use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, FileType}; use datafusion_expr::CreateExternalTable; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 312183588c456..f2b5e766a66eb 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -66,7 +66,7 @@ use arrow::{ datatypes::{DataType, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; -use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; +use datafusion_common::{plan_err}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::ExecutionPlan; @@ -96,9 +96,7 @@ pub struct FileSinkConfig { /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size. - /// Note that for externally defined FileTypes, FileTypeWriterOptions contains arbitrary - /// config tuples that must be handled within the physical plan. - pub file_type_writer_options: FileTypeWriterOptions, + pub file_type_writer_options: StatementOptions, } impl FileSinkConfig { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 549c25f89bae6..ca04fc1c5bb43 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -24,7 +24,7 @@ use std::convert::TryFrom; use std::iter::zip; use std::sync::Arc; -use crate::dml::{CopyOptions, CopyTo}; +use crate::dml::{CopyTo}; use crate::expr::Alias; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, @@ -49,6 +49,7 @@ use crate::{ use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::file_options::StatementOptions; use datafusion_common::{ get_target_functional_dependencies, plan_datafusion_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, FileType, OwnedTableReference, Result, @@ -238,14 +239,12 @@ impl LogicalPlanBuilder { pub fn copy_to( input: LogicalPlan, output_url: String, - file_format: FileType, single_file_output: bool, - copy_options: CopyOptions, + copy_options: StatementOptions, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, - file_format, single_file_output, copy_options, }))) diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 4cd56b89ac635..a781bed2bd8dd 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -21,7 +21,7 @@ use std::{ }; use datafusion_common::{ - file_options::StatementOptions, DFSchemaRef, FileType, FileTypeWriterOptions, + file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference, }; @@ -34,55 +34,14 @@ pub struct CopyTo { pub input: Arc, /// The location to write the file(s) pub output_url: String, - /// The file format to output (explicitly defined or inferred from file extension) - pub file_format: FileType, /// If false, it is assumed output_url is a file to which all data should be written /// regardless of input partitioning. Otherwise, output_url is assumed to be a directory /// to which each output partition is written to its own output file pub single_file_output: bool, /// Arbitrary options as tuples - pub copy_options: CopyOptions, + pub copy_options: StatementOptions, } -/// When the logical plan is constructed from SQL, CopyOptions -/// will contain arbitrary string tuples which must be parsed into -/// FileTypeWriterOptions. When the logical plan is constructed directly -/// from rust code (such as via the DataFrame API), FileTypeWriterOptions -/// can be provided directly, avoiding the run time cost and fallibility of -/// parsing string based options. -#[derive(Clone)] -pub enum CopyOptions { - /// Holds StatementOptions parsed from a SQL statement - SQLOptions(StatementOptions), - /// Holds FileTypeWriterOptions directly provided - WriterOptions(Box), -} - -impl PartialEq for CopyOptions { - fn eq(&self, other: &CopyOptions) -> bool { - match self { - Self::SQLOptions(statement1) => match other { - Self::SQLOptions(statement2) => statement1.eq(statement2), - Self::WriterOptions(_) => false, - }, - Self::WriterOptions(_) => false, - } - } -} - -impl Eq for CopyOptions {} - -impl std::hash::Hash for CopyOptions { - fn hash(&self, hasher: &mut H) - where - H: std::hash::Hasher, - { - match self { - Self::SQLOptions(statement) => statement.hash(hasher), - Self::WriterOptions(_) => (), - } - } -} /// The operator that modifies the content of a database (adapted from /// substrait WriteRel) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9b0f441ef9026..088408fb54007 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use super::dml::CopyTo; use super::DdlStatement; -use crate::dml::CopyOptions; use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr}; use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; @@ -621,13 +620,11 @@ impl LogicalPlan { LogicalPlan::Copy(CopyTo { input: _, output_url, - file_format, copy_options, single_file_output, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs[0].clone()), output_url: output_url.clone(), - file_format: file_format.clone(), single_file_output: *single_file_output, copy_options: copy_options.clone(), })), @@ -1552,22 +1549,18 @@ impl LogicalPlan { LogicalPlan::Copy(CopyTo { input: _, output_url, - file_format, single_file_output, copy_options, }) => { - let op_str = match copy_options { - CopyOptions::SQLOptions(statement) => statement - .clone() - .into_inner() - .iter() - .map(|(k, v)| format!("{k} {v}")) - .collect::>() - .join(", "), - CopyOptions::WriterOptions(_) => "".into(), - }; - - write!(f, "CopyTo: format={file_format} output_url={output_url} single_file_output={single_file_output} options: ({op_str})") + let op_str = copy_options + .clone() + .into_inner() + .iter() + .map(|(k, v)| format!("{k} {v}")) + .collect::>() + .join(", "); + + write!(f, "CopyTo: output_url={output_url} single_file_output={single_file_output} options: ({op_str})") } LogicalPlan::Ddl(ddl) => { write!(f, "{}", ddl.display()) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 12083554f0932..d2127d77c88ca 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -35,7 +35,7 @@ use datafusion_common::{ Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue, SchemaReference, TableReference, ToDFSchema, }; -use datafusion_expr::dml::{CopyOptions, CopyTo}; +use datafusion_expr::dml::{CopyTo}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; use datafusion_expr::logical_plan::DdlStatement; @@ -675,20 +675,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|(s, v)| (s.to_owned(), v.to_string())) .collect::>(); - let mut statement_options = StatementOptions::new(options); - let file_format = statement_options.try_infer_file_type(&statement.target)?; + let mut copy_options = StatementOptions::new(options); let single_file_output = - statement_options.take_bool_option("single_file_output")?; + copy_options.take_bool_option("single_file_output")?; // COPY defaults to outputting a single file if not otherwise specified let single_file_output = single_file_output.unwrap_or(true); - let copy_options = CopyOptions::SQLOptions(statement_options); Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url: statement.target, - file_format, single_file_output, copy_options, }))