diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 5ada61e93d62..14a475859810 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -34,306 +34,322 @@ use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, In use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; -/// Create array reader from parquet schema, projection mask, and parquet file reader. -pub fn build_array_reader( - field: Option<&ParquetField>, - mask: &ProjectionMask, - row_groups: &dyn RowGroups, -) -> Result> { - let reader = field - .and_then(|field| build_reader(field, mask, row_groups).transpose()) - .transpose()? - .unwrap_or_else(|| make_empty_array_reader(row_groups.num_rows())); - - Ok(reader) +/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader +pub(crate) struct ArrayReaderBuilder<'a> { + row_groups: &'a dyn RowGroups, } -fn build_reader( - field: &ParquetField, - mask: &ProjectionMask, - row_groups: &dyn RowGroups, -) -> Result>> { - match field.field_type { - ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups), - ParquetFieldType::Group { .. } => match &field.arrow_type { - DataType::Map(_, _) => build_map_reader(field, mask, row_groups), - DataType::Struct(_) => build_struct_reader(field, mask, row_groups), - DataType::List(_) => build_list_reader(field, mask, false, row_groups), - DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups), - DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups), - d => unimplemented!("reading group type {} not implemented", d), - }, +impl<'a> ArrayReaderBuilder<'a> { + pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self { + Self { row_groups } } -} -/// Build array reader for map type. -fn build_map_reader( - field: &ParquetField, - mask: &ProjectionMask, - row_groups: &dyn RowGroups, -) -> Result>> { - let children = field.children().unwrap(); - assert_eq!(children.len(), 2); - - let key_reader = build_reader(&children[0], mask, row_groups)?; - let value_reader = build_reader(&children[1], mask, row_groups)?; - - match (key_reader, value_reader) { - (Some(key_reader), Some(value_reader)) => { - // Need to retrieve underlying data type to handle projection - let key_type = key_reader.get_data_type().clone(); - let value_type = value_reader.get_data_type().clone(); - - let data_type = match &field.arrow_type { - DataType::Map(map_field, is_sorted) => match map_field.data_type() { - DataType::Struct(fields) => { - assert_eq!(fields.len(), 2); - let struct_field = map_field.as_ref().clone().with_data_type( - DataType::Struct(Fields::from(vec![ - fields[0].as_ref().clone().with_data_type(key_type), - fields[1].as_ref().clone().with_data_type(value_type), - ])), - ); - DataType::Map(Arc::new(struct_field), *is_sorted) - } - _ => unreachable!(), - }, - _ => unreachable!(), - }; - - Ok(Some(Box::new(MapArrayReader::new( - key_reader, - value_reader, - data_type, - field.def_level, - field.rep_level, - field.nullable, - )))) + /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader. + pub fn build_array_reader( + &self, + field: Option<&ParquetField>, + mask: &ProjectionMask, + ) -> Result> { + let reader = field + .and_then(|field| self.build_reader(field, mask).transpose()) + .transpose()? + .unwrap_or_else(|| make_empty_array_reader(self.num_rows())); + + Ok(reader) + } + + /// Return the total number of rows + fn num_rows(&self) -> usize { + self.row_groups.num_rows() + } + + fn build_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + match field.field_type { + ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask), + ParquetFieldType::Group { .. } => match &field.arrow_type { + DataType::Map(_, _) => self.build_map_reader(field, mask), + DataType::Struct(_) => self.build_struct_reader(field, mask), + DataType::List(_) => self.build_list_reader(field, mask, false), + DataType::LargeList(_) => self.build_list_reader(field, mask, true), + DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask), + d => unimplemented!("reading group type {} not implemented", d), + }, } - (None, None) => Ok(None), - _ => Err(general_err!( - "partial projection of MapArray is not supported" - )), } -} -/// Build array reader for list type. -fn build_list_reader( - field: &ParquetField, - mask: &ProjectionMask, - is_large: bool, - row_groups: &dyn RowGroups, -) -> Result>> { - let children = field.children().unwrap(); - assert_eq!(children.len(), 1); - - let reader = match build_reader(&children[0], mask, row_groups)? { - Some(item_reader) => { - // Need to retrieve underlying data type to handle projection - let item_type = item_reader.get_data_type().clone(); - let data_type = match &field.arrow_type { - DataType::List(f) => { - DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type))) - } - DataType::LargeList(f) => { - DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type))) - } - _ => unreachable!(), - }; + /// Build array reader for map type. + fn build_map_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + let children = field.children().unwrap(); + assert_eq!(children.len(), 2); + + let key_reader = self.build_reader(&children[0], mask)?; + let value_reader = self.build_reader(&children[1], mask)?; + + match (key_reader, value_reader) { + (Some(key_reader), Some(value_reader)) => { + // Need to retrieve underlying data type to handle projection + let key_type = key_reader.get_data_type().clone(); + let value_type = value_reader.get_data_type().clone(); + + let data_type = match &field.arrow_type { + DataType::Map(map_field, is_sorted) => match map_field.data_type() { + DataType::Struct(fields) => { + assert_eq!(fields.len(), 2); + let struct_field = map_field.as_ref().clone().with_data_type( + DataType::Struct(Fields::from(vec![ + fields[0].as_ref().clone().with_data_type(key_type), + fields[1].as_ref().clone().with_data_type(value_type), + ])), + ); + DataType::Map(Arc::new(struct_field), *is_sorted) + } + _ => unreachable!(), + }, + _ => unreachable!(), + }; - let reader = match is_large { - false => Box::new(ListArrayReader::::new( - item_reader, - data_type, - field.def_level, - field.rep_level, - field.nullable, - )) as _, - true => Box::new(ListArrayReader::::new( - item_reader, + Ok(Some(Box::new(MapArrayReader::new( + key_reader, + value_reader, data_type, field.def_level, field.rep_level, field.nullable, - )) as _, - }; - Some(reader) + )))) + } + (None, None) => Ok(None), + _ => Err(general_err!( + "partial projection of MapArray is not supported" + )), } - None => None, - }; - Ok(reader) -} + } -/// Build array reader for fixed-size list type. -fn build_fixed_size_list_reader( - field: &ParquetField, - mask: &ProjectionMask, - row_groups: &dyn RowGroups, -) -> Result>> { - let children = field.children().unwrap(); - assert_eq!(children.len(), 1); - - let reader = match build_reader(&children[0], mask, row_groups)? { - Some(item_reader) => { - let item_type = item_reader.get_data_type().clone(); - let reader = match &field.arrow_type { - &DataType::FixedSizeList(ref f, size) => { - let data_type = DataType::FixedSizeList( - Arc::new(f.as_ref().clone().with_data_type(item_type)), - size, - ); - - Box::new(FixedSizeListArrayReader::new( + /// Build array reader for list type. + fn build_list_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + is_large: bool, + ) -> Result>> { + let children = field.children().unwrap(); + assert_eq!(children.len(), 1); + + let reader = match self.build_reader(&children[0], mask)? { + Some(item_reader) => { + // Need to retrieve underlying data type to handle projection + let item_type = item_reader.get_data_type().clone(); + let data_type = match &field.arrow_type { + DataType::List(f) => { + DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type))) + } + DataType::LargeList(f) => { + DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type))) + } + _ => unreachable!(), + }; + + let reader = match is_large { + false => Box::new(ListArrayReader::::new( item_reader, - size as usize, data_type, field.def_level, field.rep_level, field.nullable, - )) as _ - } - _ => unimplemented!(), - }; - Some(reader) + )) as _, + true => Box::new(ListArrayReader::::new( + item_reader, + data_type, + field.def_level, + field.rep_level, + field.nullable, + )) as _, + }; + Some(reader) + } + None => None, + }; + Ok(reader) + } + + /// Build array reader for fixed-size list type. + fn build_fixed_size_list_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + let children = field.children().unwrap(); + assert_eq!(children.len(), 1); + + let reader = match self.build_reader(&children[0], mask)? { + Some(item_reader) => { + let item_type = item_reader.get_data_type().clone(); + let reader = match &field.arrow_type { + &DataType::FixedSizeList(ref f, size) => { + let data_type = DataType::FixedSizeList( + Arc::new(f.as_ref().clone().with_data_type(item_type)), + size, + ); + + Box::new(FixedSizeListArrayReader::new( + item_reader, + size as usize, + data_type, + field.def_level, + field.rep_level, + field.nullable, + )) as _ + } + _ => unimplemented!(), + }; + Some(reader) + } + None => None, + }; + Ok(reader) + } + + /// Creates primitive array reader for each primitive type. + fn build_primitive_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + let (col_idx, primitive_type) = match &field.field_type { + ParquetFieldType::Primitive { + col_idx, + primitive_type, + } => match primitive_type.as_ref() { + Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()), + Type::GroupType { .. } => unreachable!(), + }, + _ => unreachable!(), + }; + + if !mask.leaf_included(col_idx) { + return Ok(None); } - None => None, - }; - Ok(reader) -} -/// Creates primitive array reader for each primitive type. -fn build_primitive_reader( - field: &ParquetField, - mask: &ProjectionMask, - row_groups: &dyn RowGroups, -) -> Result>> { - let (col_idx, primitive_type) = match &field.field_type { - ParquetFieldType::Primitive { - col_idx, + let physical_type = primitive_type.get_physical_type(); + + // We don't track the column path in ParquetField as it adds a potential source + // of bugs when the arrow mapping converts more than one level in the parquet + // schema into a single arrow field. + // + // None of the readers actually use this field, but it is required for this type, + // so just stick a placeholder in + let column_desc = Arc::new(ColumnDescriptor::new( primitive_type, - } => match primitive_type.as_ref() { - Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()), - Type::GroupType { .. } => unreachable!(), - }, - _ => unreachable!(), - }; - - if !mask.leaf_included(col_idx) { - return Ok(None); + field.def_level, + field.rep_level, + ColumnPath::new(vec![]), + )); + + let page_iterator = self.row_groups.column_chunks(col_idx)?; + let arrow_type = Some(field.arrow_type.clone()); + + let reader = match physical_type { + PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?) as _, + PhysicalType::INT32 => { + if let Some(DataType::Null) = arrow_type { + Box::new(NullArrayReader::::new( + page_iterator, + column_desc, + )?) as _ + } else { + Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?) as _ + } + } + PhysicalType::INT64 => Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?) as _, + PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?) as _, + PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?) as _, + PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?) as _, + PhysicalType::BYTE_ARRAY => match arrow_type { + Some(DataType::Dictionary(_, _)) => { + make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? + } + Some(DataType::Utf8View | DataType::BinaryView) => { + make_byte_view_array_reader(page_iterator, column_desc, arrow_type)? + } + _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?, + }, + PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type { + Some(DataType::Dictionary(_, _)) => { + make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? + } + _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?, + }, + }; + Ok(Some(reader)) } - let physical_type = primitive_type.get_physical_type(); - - // We don't track the column path in ParquetField as it adds a potential source - // of bugs when the arrow mapping converts more than one level in the parquet - // schema into a single arrow field. - // - // None of the readers actually use this field, but it is required for this type, - // so just stick a placeholder in - let column_desc = Arc::new(ColumnDescriptor::new( - primitive_type, - field.def_level, - field.rep_level, - ColumnPath::new(vec![]), - )); - - let page_iterator = row_groups.column_chunks(col_idx)?; - let arrow_type = Some(field.arrow_type.clone()); - - let reader = match physical_type { - PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?) as _, - PhysicalType::INT32 => { - if let Some(DataType::Null) = arrow_type { - Box::new(NullArrayReader::::new( - page_iterator, - column_desc, - )?) as _ - } else { - Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?) as _ + fn build_struct_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + let arrow_fields = match &field.arrow_type { + DataType::Struct(children) => children, + _ => unreachable!(), + }; + let children = field.children().unwrap(); + assert_eq!(arrow_fields.len(), children.len()); + + let mut readers = Vec::with_capacity(children.len()); + let mut builder = SchemaBuilder::with_capacity(children.len()); + + for (arrow, parquet) in arrow_fields.iter().zip(children) { + if let Some(reader) = self.build_reader(parquet, mask)? { + // Need to retrieve underlying data type to handle projection + let child_type = reader.get_data_type().clone(); + builder.push(arrow.as_ref().clone().with_data_type(child_type)); + readers.push(reader); } } - PhysicalType::INT64 => Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?) as _, - PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?) as _, - PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?) as _, - PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - arrow_type, - )?) as _, - PhysicalType::BYTE_ARRAY => match arrow_type { - Some(DataType::Dictionary(_, _)) => { - make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? - } - Some(DataType::Utf8View | DataType::BinaryView) => { - make_byte_view_array_reader(page_iterator, column_desc, arrow_type)? - } - _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?, - }, - PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type { - Some(DataType::Dictionary(_, _)) => { - make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? - } - _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?, - }, - }; - Ok(Some(reader)) -} -fn build_struct_reader( - field: &ParquetField, - mask: &ProjectionMask, - row_groups: &dyn RowGroups, -) -> Result>> { - let arrow_fields = match &field.arrow_type { - DataType::Struct(children) => children, - _ => unreachable!(), - }; - let children = field.children().unwrap(); - assert_eq!(arrow_fields.len(), children.len()); - - let mut readers = Vec::with_capacity(children.len()); - let mut builder = SchemaBuilder::with_capacity(children.len()); - - for (arrow, parquet) in arrow_fields.iter().zip(children) { - if let Some(reader) = build_reader(parquet, mask, row_groups)? { - // Need to retrieve underlying data type to handle projection - let child_type = reader.get_data_type().clone(); - builder.push(arrow.as_ref().clone().with_data_type(child_type)); - readers.push(reader); + if readers.is_empty() { + return Ok(None); } - } - if readers.is_empty() { - return Ok(None); + Ok(Some(Box::new(StructArrayReader::new( + DataType::Struct(builder.finish().fields), + readers, + field.def_level, + field.rep_level, + field.nullable, + )))) } - - Ok(Some(Box::new(StructArrayReader::new( - DataType::Struct(builder.finish().fields), - readers, - field.def_level, - field.rep_level, - field.nullable, - )))) } #[cfg(test)] @@ -359,7 +375,9 @@ mod tests { ) .unwrap(); - let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap(); + let array_reader = ArrayReaderBuilder::new(&file_reader) + .build_array_reader(fields.as_ref(), &mask) + .unwrap(); // Create arrow types let arrow_type = DataType::Struct(Fields::from(vec![Field::new( diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 06448d5f6ff2..66c4f30b3c29 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -246,9 +246,9 @@ impl ArrayReader for ListArrayReader { #[cfg(test)] mod tests { use super::*; - use crate::arrow::array_reader::build_array_reader; use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::array_reader::ArrayReaderBuilder; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask}; use crate::file::properties::WriterProperties; @@ -563,7 +563,9 @@ mod tests { ) .unwrap(); - let mut array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap(); + let mut array_reader = ArrayReaderBuilder::new(&file_reader) + .build_array_reader(fields.as_ref(), &mask) + .unwrap(); let batch = array_reader.next_batch(100).unwrap(); assert_eq!(batch.data_type(), array_reader.get_data_type()); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index a5ea426e95bb..94d61c9eacf5 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -45,7 +45,7 @@ mod struct_array; #[cfg(test)] mod test_util; -pub use builder::build_array_reader; +pub(crate) use builder::ArrayReaderBuilder; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks @@ -111,7 +111,8 @@ pub trait RowGroups { /// Get the number of rows in this collection fn num_rows(&self) -> usize; - /// Returns a [`PageIterator`] for the column chunks with the given leaf column index + /// Returns a [`PageIterator`] for all pages in the specified column chunk + /// across all row groups in this collection. fn column_chunks(&self, i: usize) -> Result>; } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index ea068acb29eb..a8688e8af83c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -26,7 +26,7 @@ pub use selection::{RowSelection, RowSelector}; use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; -use crate::arrow::array_reader::{build_array_reader, ArrayReader}; +use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use crate::column::page::{PageIterator, PageReader}; @@ -690,14 +690,16 @@ impl ParquetRecordBatchReaderBuilder { break; } - let array_reader = - build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?; + let array_reader = ArrayReaderBuilder::new(&reader) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } - let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?; + let array_reader = ArrayReaderBuilder::new(&reader) + .build_array_reader(self.fields.as_deref(), &self.projection)?; + let read_plan = plan_builder .limited(reader.num_rows()) .with_offset(self.offset) @@ -896,8 +898,8 @@ impl ParquetRecordBatchReader { batch_size: usize, selection: Option, ) -> Result { - let array_reader = - build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?; + let array_reader = ArrayReaderBuilder::new(row_groups) + .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; let read_plan = ReadPlanBuilder::new(batch_size) .with_selection(selection) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index ac4d24ee27dc..611d6999e07e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -38,7 +38,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Fields, Schema, SchemaRef}; -use crate::arrow::array_reader::{build_array_reader, RowGroups}; +use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, @@ -613,8 +613,8 @@ where .fetch(&mut self.input, predicate.projection(), selection) .await?; - let array_reader = - build_array_reader(self.fields.as_deref(), predicate.projection(), &row_group)?; + let array_reader = ArrayReaderBuilder::new(&row_group) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } @@ -661,7 +661,9 @@ where let plan = plan_builder.build(); - let array_reader = build_array_reader(self.fields.as_deref(), &projection, &row_group)?; + let array_reader = ArrayReaderBuilder::new(&row_group) + .build_array_reader(self.fields.as_deref(), &projection)?; + let reader = ParquetRecordBatchReader::new(array_reader, plan); Ok((self, Some(reader)))