diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index 909227a369ab7..eae30b42194ca 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -79,15 +79,47 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { pub fn schema_lookup(schema: AvroSchema) -> Result> { match schema { AvroSchema::Record(RecordSchema { - lookup: ref schema_lookup, - .. - }) => Ok(schema_lookup.clone()), + fields, mut lookup, .. + }) => { + for field in fields { + Self::child_schema_lookup(&field.schema, &mut lookup)?; + } + Ok(lookup) + } _ => Err(DataFusionError::ArrowError(SchemaError( "expected avro schema to be a record".to_string(), ))), } } + fn child_schema_lookup<'b>( + schema: &AvroSchema, + schema_lookup: &'b mut BTreeMap, + ) -> Result<&'b BTreeMap> { + match schema { + AvroSchema::Record(RecordSchema { + name, + fields, + lookup, + .. + }) => { + lookup.iter().for_each(|(field_name, pos)| { + schema_lookup + .insert(format!("{}.{}", name.fullname(None), field_name), *pos); + }); + + for field in fields { + Self::child_schema_lookup(&field.schema, schema_lookup)?; + } + } + AvroSchema::Array(schema) => { + Self::child_schema_lookup(schema, schema_lookup)?; + } + _ => (), + } + Ok(schema_lookup) + } + /// Read the next batch of records pub fn next_batch(&mut self, batch_size: usize) -> Option> { let rows_result = self @@ -519,9 +551,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { let num_bytes = bit_util::ceil(array_item_count, 8); let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); let mut struct_index = 0; - let rows: Vec> = rows + let null_struct_array = vec![("null".to_string(), Value::Null)]; + let rows: Vec<&Vec<(String, Value)>> = rows .iter() - .map(|row| { + .flat_map(|row| { if let Value::Array(values) = row { values.iter().for_each(|_| { bit_util::set_bit(&mut null_buffer, struct_index); @@ -529,15 +562,17 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { }); values .iter() - .map(|v| ("".to_string(), v.clone())) - .collect::>() + .map(|v| match v { + Value::Record(record) => record, + other => panic!("expected Record, got {other:?}"), + }) + .collect::>>() } else { struct_index += 1; - vec![("null".to_string(), Value::Null)] + vec![&null_struct_array] } }) .collect(); - let rows = rows.iter().collect::>>(); let arrays = self.build_struct_array(&rows, fields, &[])?; let data_type = DataType::Struct(fields.clone()); ArrayDataBuilder::new(data_type) diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 447df5f90cf72..e82b6e69025f2 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -55,12 +55,12 @@ postgres-protocol = {version = "0.6.4", optional = true} [features] postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-protocol"] +avro = ["datafusion/avro"] [dev-dependencies] env_logger = "0.10" num_cpus = "1.13.0" - [[test]] harness = false name = "sqllogictests" diff --git a/datafusion/sqllogictest/test_files/avro.slt b/datafusion/sqllogictest/test_files/avro.slt index 83b8a22ab7c29..0037c7e2f4c19 100644 --- a/datafusion/sqllogictest/test_files/avro.slt +++ b/datafusion/sqllogictest/test_files/avro.slt @@ -42,6 +42,12 @@ STORED AS AVRO WITH HEADER ROW LOCATION '../../testing/data/avro/single_nan.avro' +statement ok +CREATE EXTERNAL TABLE nested_records +STORED AS AVRO +WITH HEADER ROW +LOCATION '../../testing/data/avro/nested_records.avro' + # test avro query query IT SELECT id, CAST(string_col AS varchar) FROM alltypes_plain @@ -82,6 +88,13 @@ SELECT id, CAST(string_col AS varchar) FROM alltypes_plain_multi_files 0 0 1 1 +# test avro nested records +query ?? +SELECT f1, f2 FROM nested_records +---- +{ns2.record2.f1_1: aaa, ns2.record2.f1_2: 10, ns2.record2.f1_3: {ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: true, ns4.record4.f2_2: 1.2}, {ns4.record4.f2_1: true, ns4.record4.f2_2: 2.2}] +{ns2.record2.f1_1: bbb, ns2.record2.f1_2: 20, ns2.record2.f1_3: {ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: false, ns4.record4.f2_2: 10.2}] + # test avro explain query TT EXPLAIN SELECT count(*) from alltypes_plain diff --git a/parquet-testing b/parquet-testing index a11fc8f148f8a..d79a0101d90df 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit a11fc8f148f8a7a89d9281cc0da3eb9d56095fbf +Subproject commit d79a0101d90dfa3bbb10337626f57a3e8c4b5363 diff --git a/testing b/testing index e81d0c6de3594..2c84953c8c277 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e +Subproject commit 2c84953c8c2779a0dc86ef9ebe8a6cd978125bfe