diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7fcd41049cb4e..e102cfc372dde 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -30,9 +30,9 @@ use crate::arrow::array::{ }; use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::{ - DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec, - SchemaAdapterFactory, +use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, ParquetExec}; +use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 351967d353245..c28788eed4582 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -30,6 +30,7 @@ pub mod listing_table_factory; pub mod memory; pub mod physical_plan; pub mod provider; +pub mod schema_adapter; mod statistics; pub mod stream; pub mod streaming; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 6e19961f60284..720e29e35582d 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -31,19 +31,7 @@ mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; #[cfg(feature = "parquet")] -pub use self::parquet::{ - ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter, - SchemaAdapterFactory, SchemaMapper, -}; -#[cfg(feature = "parquet")] -use arrow::{ - array::new_null_array, - compute::{can_cast_types, cast}, - datatypes::Schema, - record_batch::{RecordBatch, RecordBatchOptions}, -}; -#[cfg(feature = "parquet")] -use datafusion_common::plan_err; +pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; pub use arrow_file::ArrowExec; pub use avro::AvroExec; @@ -247,119 +235,6 @@ where Ok(()) } -#[cfg(feature = "parquet")] -#[derive(Clone, Debug, Default)] -pub(crate) struct DefaultSchemaAdapterFactory {} - -#[cfg(feature = "parquet")] -impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { - fn create(&self, table_schema: SchemaRef) -> Box { - Box::new(DefaultSchemaAdapter { table_schema }) - } -} - -#[cfg(feature = "parquet")] -#[derive(Clone, Debug)] -pub(crate) struct DefaultSchemaAdapter { - /// Schema for the table - table_schema: SchemaRef, -} - -#[cfg(feature = "parquet")] -impl SchemaAdapter for DefaultSchemaAdapter { - /// Map a column index in the table schema to a column index in a particular - /// file schema - /// - /// Panics if index is not in range for the table schema - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. - /// - /// If the provided `file_schema` contains columns of a different type to the expected - /// `table_schema`, the method will attempt to cast the array data from the file schema - /// to the table schema where possible. - /// - /// Returns a [`SchemaMapping`] that can be applied to the output batch - /// along with an ordered list of columns to project from the file - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.table_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - self.table_schema.fields().find(file_field.name()) - { - match can_cast_types(file_field.data_type(), table_field.data_type()) { - true => { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } - false => { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) - } - } - } - } - - Ok(( - Arc::new(SchemaMapping { - table_schema: self.table_schema.clone(), - field_mappings, - }), - projection, - )) - } -} - -/// The SchemaMapping struct holds a mapping from the file schema to the table schema -/// and any necessary type conversions that need to be applied. -#[cfg(feature = "parquet")] -#[derive(Debug)] -pub struct SchemaMapping { - /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. - table_schema: SchemaRef, - /// Mapping from field index in `table_schema` to index in projected file_schema - field_mappings: Vec>, -} - -#[cfg(feature = "parquet")] -impl SchemaMapper for SchemaMapping { - /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. - fn map_batch(&self, batch: RecordBatch) -> Result { - let batch_rows = batch.num_rows(); - let batch_cols = batch.columns().to_vec(); - - let cols = self - .table_schema - .fields() - .iter() - .zip(&self.field_mappings) - .map(|(field, file_idx)| match file_idx { - Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()), - None => Ok(new_null_array(field.data_type(), batch_rows)), - }) - .collect::, _>>()?; - - // Necessary to handle empty batches - let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - - let schema = self.table_schema.clone(); - let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; - Ok(record_batch) - } -} - /// A single file or part of a file that should be read, along with its schema, statistics pub struct FileMeta { /// Path for the file (e.g. URL, filesystem path, etc) @@ -621,11 +496,14 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::types::{Float32Type, Float64Type, UInt32Type}; use arrow_array::{ - BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, StringArray, - UInt64Array, + BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch, + StringArray, UInt64Array, }; - use arrow_schema::Field; + use arrow_schema::{Field, Schema}; + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, + }; use chrono::Utc; #[test] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 17cb6a66c7058..9ee2b3a730dd0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::datasource::physical_plan::{ - parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs, - FileGroupPartitioner, FileMeta, FileScanConfig, + parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, + FileMeta, FileScanConfig, }; use crate::{ config::{ConfigOptions, TableParquetOptions}, @@ -67,12 +67,13 @@ mod metrics; mod page_filter; mod row_filter; mod row_groups; -mod schema_adapter; mod statistics; use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; +use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, +}; pub use metrics::ParquetFileMetrics; -pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; pub use statistics::{RequestedStatistics, StatisticsConverter}; /// Execution plan for scanning one or more Parquet partitions diff --git a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs deleted file mode 100644 index 193e5161a398e..0000000000000 --- a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef}; -use std::fmt::Debug; -use std::sync::Arc; - -/// Factory of schema adapters. -/// -/// Provides means to implement custom schema adaptation. -pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { - /// Provides `SchemaAdapter` for the ParquetExec. - fn create(&self, schema: SchemaRef) -> Box; -} - -/// A utility which can adapt file-level record batches to a table schema which may have a schema -/// obtained from merging multiple file-level schemas. -/// -/// This is useful for enabling schema evolution in partitioned datasets. -/// -/// This has to be done in two stages. -/// -/// 1. Before reading the file, we have to map projected column indexes from the table schema to -/// the file schema. -/// -/// 2. After reading a record batch we need to map the read columns back to the expected columns -/// indexes and insert null-valued columns wherever the file schema was missing a colum present -/// in the table schema. -pub trait SchemaAdapter: Send + Sync { - /// Map a column index in the table schema to a column index in a particular - /// file schema - /// - /// Panics if index is not in range for the table schema - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; - - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. - /// - /// If the provided `file_schema` contains columns of a different type to the expected - /// `table_schema`, the method will attempt to cast the array data from the file schema - /// to the table schema where possible. - /// - /// Returns a [`SchemaMapper`] that can be applied to the output batch - /// along with an ordered list of columns to project from the file - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)>; -} - -/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema. -pub trait SchemaMapper: Send + Sync { - /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. - fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; -} diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs new file mode 100644 index 0000000000000..36d33379b8877 --- /dev/null +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Schema Adapter provides a method of translating the RecordBatches that come out of the +//! physical format into how they should be used by DataFusion. For instance, a schema +//! can be stored external to a parquet file that maps parquet logical types to arrow types. + +use arrow::compute::{can_cast_types, cast}; +use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow_schema::{Schema, SchemaRef}; +use datafusion_common::plan_err; +use std::fmt::Debug; +use std::sync::Arc; + +/// Factory of schema adapters. +/// +/// Provides means to implement custom schema adaptation. +pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { + /// Provides `SchemaAdapter`. + fn create(&self, schema: SchemaRef) -> Box; +} + +/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// obtained from merging multiple file-level schemas. +/// +/// This is useful for enabling schema evolution in partitioned datasets. +/// +/// This has to be done in two stages. +/// +/// 1. Before reading the file, we have to map projected column indexes from the table schema to +/// the file schema. +/// +/// 2. After reading a record batch we need to map the read columns back to the expected columns +/// indexes and insert null-valued columns wherever the file schema was missing a colum present +/// in the table schema. +pub trait SchemaAdapter: Send + Sync { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to the expected + /// `table_schema`, the method will attempt to cast the array data from the file schema + /// to the table schema where possible. + /// + /// Returns a [`SchemaMapper`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)>; +} + +/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema. +pub trait SchemaMapper: Send + Sync { + /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct DefaultSchemaAdapterFactory {} + +impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { + fn create(&self, table_schema: SchemaRef) -> Box { + Box::new(DefaultSchemaAdapter { table_schema }) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct DefaultSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for DefaultSchemaAdapter { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to the expected + /// `table_schema`, the method will attempt to cast the array data from the file schema + /// to the table schema where possible. + /// + /// Returns a [`SchemaMapping`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + self.table_schema.fields().find(file_field.name()) + { + match can_cast_types(file_field.data_type(), table_field.data_type()) { + true => { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + false => { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } + } + } + + Ok(( + Arc::new(SchemaMapping { + table_schema: self.table_schema.clone(), + field_mappings, + }), + projection, + )) + } +} + +/// The SchemaMapping struct holds a mapping from the file schema to the table schema +/// and any necessary type conversions that need to be applied. +#[derive(Debug)] +pub struct SchemaMapping { + /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. + table_schema: SchemaRef, + /// Mapping from field index in `table_schema` to index in projected file_schema + field_mappings: Vec>, +} + +impl SchemaMapper for SchemaMapping { + /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); + + let cols = self + .table_schema + .fields() + .iter() + .zip(&self.field_mappings) + .map(|(field, file_idx)| match file_idx { + Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()), + None => Ok(new_null_array(field.data_type(), batch_rows)), + }) + .collect::, _>>()?; + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = self.table_schema.clone(); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::sync::Arc; + + use crate::assert_batches_sorted_eq; + use arrow::datatypes::{Field, Schema}; + use arrow::record_batch::RecordBatch; + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, SchemaRef}; + use object_store::path::Path; + use object_store::ObjectMeta; + + use crate::datasource::object_store::ObjectStoreUrl; + use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; + use crate::physical_plan::collect; + use crate::prelude::SessionContext; + + use crate::datasource::listing::PartitionedFile; + use crate::datasource::schema_adapter::{ + SchemaAdapter, SchemaAdapterFactory, SchemaMapper, + }; + use parquet::arrow::ArrowWriter; + use tempfile::TempDir; + + #[tokio::test] + async fn can_override_schema_adapter() { + // Test shows that SchemaAdapter can add a column that doesn't existin in the + // record batches returned from parquet. This can be useful for schema evolution + // where older files may not have all columns. + let tmp_dir = TempDir::new().unwrap(); + let table_dir = tmp_dir.path().join("parquet_test"); + fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); + let f1 = Field::new("id", DataType::Int32, true); + + let file_schema = Arc::new(Schema::new(vec![f1.clone()])); + let filename = "part.parquet".to_string(); + let path = table_dir.as_path().join(filename.clone()); + let file = fs::File::create(path.clone()).unwrap(); + let mut writer = ArrowWriter::try_new(file, file_schema.clone(), None).unwrap(); + + let ids = Arc::new(Int32Array::from(vec![1i32])); + let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap(); + + writer.write(&rec_batch).unwrap(); + writer.close().unwrap(); + + let location = Path::parse(path.to_str().unwrap()).unwrap(); + let metadata = std::fs::metadata(path.as_path()).expect("Local file metadata"); + let meta = ObjectMeta { + location, + last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), + size: metadata.len() as usize, + e_tag: None, + version: None, + }; + + let partitioned_file = PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + }; + + let f1 = Field::new("id", DataType::Int32, true); + let f2 = Field::new("extra_column", DataType::Utf8, true); + + let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_file(partitioned_file), + None, + None, + Default::default(), + ) + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); + + let expected = [ + "+----+--------------+", + "| id | extra_column |", + "+----+--------------+", + "| 1 | foo |", + "+----+--------------+", + ]; + + assert_batches_sorted_eq!(expected, &read); + } + + #[derive(Debug)] + struct TestSchemaAdapterFactory {} + + impl SchemaAdapterFactory for TestSchemaAdapterFactory { + fn create(&self, schema: SchemaRef) -> Box { + Box::new(TestSchemaAdapter { + table_schema: schema, + }) + } + } + + struct TestSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, + } + + impl SchemaAdapter for TestSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok((Arc::new(TestSchemaMapping {}), projection)) + } + } + + #[derive(Debug)] + struct TestSchemaMapping {} + + impl SchemaMapper for TestSchemaMapping { + fn map_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let f1 = Field::new("id", DataType::Int32, true); + let f2 = Field::new("extra_column", DataType::Utf8, true); + + let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); + + let extra_column = Arc::new(StringArray::from(vec!["foo"])); + let mut new_columns = batch.columns().to_vec(); + new_columns.push(extra_column); + + Ok(RecordBatch::try_new(schema, new_columns).unwrap()) + } + } +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index c5d0ad60bc10f..9353ed7d86ecd 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -49,7 +49,6 @@ mod filter_pushdown; mod page_pruning; mod row_group_pruning; mod schema; -mod schema_adapter; mod schema_coercion; #[cfg(test)] diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs deleted file mode 100644 index ead2884e43c5d..0000000000000 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fs; -use std::sync::Arc; - -use arrow::datatypes::{Field, Schema}; -use arrow::record_batch::RecordBatch; -use arrow_array::{Int32Array, StringArray}; -use arrow_schema::{DataType, SchemaRef}; -use datafusion::assert_batches_sorted_eq; -use object_store::path::Path; -use object_store::ObjectMeta; - -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{ - FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, -}; -use datafusion::physical_plan::collect; -use datafusion::prelude::SessionContext; - -use datafusion::datasource::listing::PartitionedFile; -use parquet::arrow::ArrowWriter; -use tempfile::TempDir; - -#[tokio::test] -async fn can_override_schema_adapter() { - // Create several parquet files in same directoty / table with - // same schema but different metadata - let tmp_dir = TempDir::new().unwrap(); - let table_dir = tmp_dir.path().join("parquet_test"); - fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); - let f1 = Field::new("id", DataType::Int32, true); - - let file_schema = Arc::new(Schema::new(vec![f1.clone()])); - let filename = "part.parquet".to_string(); - let path = table_dir.as_path().join(filename.clone()); - let file = fs::File::create(path.clone()).unwrap(); - let mut writer = ArrowWriter::try_new(file, file_schema.clone(), None).unwrap(); - - let ids = Arc::new(Int32Array::from(vec![1i32])); - let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap(); - - writer.write(&rec_batch).unwrap(); - writer.close().unwrap(); - - let location = Path::parse(path.to_str().unwrap()).unwrap(); - let metadata = std::fs::metadata(path.as_path()).expect("Local file metadata"); - let meta = ObjectMeta { - location, - last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), - size: metadata.len() as usize, - e_tag: None, - version: None, - }; - - let partitioned_file = PartitionedFile { - object_meta: meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - }; - - let f1 = Field::new("id", DataType::Int32, true); - let f2 = Field::new("extra_column", DataType::Utf8, true); - - let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); - - // prepare the scan - let parquet_exec = ParquetExec::new( - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) - .with_file(partitioned_file), - None, - None, - Default::default(), - ) - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); - - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); - - let expected = [ - "+----+--------------+", - "| id | extra_column |", - "+----+--------------+", - "| 1 | foo |", - "+----+--------------+", - ]; - - assert_batches_sorted_eq!(expected, &read); -} - -#[derive(Debug)] -struct TestSchemaAdapterFactory {} - -impl SchemaAdapterFactory for TestSchemaAdapterFactory { - fn create(&self, schema: SchemaRef) -> Box { - Box::new(TestSchemaAdapter { - table_schema: schema, - }) - } -} - -struct TestSchemaAdapter { - /// Schema for the table - table_schema: SchemaRef, -} - -impl SchemaAdapter for TestSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion_common::Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if self.table_schema.fields().find(file_field.name()).is_some() { - projection.push(file_idx); - } - } - - Ok((Arc::new(TestSchemaMapping {}), projection)) - } -} - -#[derive(Debug)] -struct TestSchemaMapping {} - -impl SchemaMapper for TestSchemaMapping { - fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { - let f1 = Field::new("id", DataType::Int32, true); - let f2 = Field::new("extra_column", DataType::Utf8, true); - - let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); - - let extra_column = Arc::new(StringArray::from(vec!["foo"])); - let mut new_columns = batch.columns().to_vec(); - new_columns.push(extra_column); - - Ok(RecordBatch::try_new(schema, new_columns).unwrap()) - } -}