diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 64b0c2c4b165..67c517ddbc4f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -600,6 +600,119 @@ make_data_page_stats_iterator!( Index::DOUBLE, f64 ); + +macro_rules! get_decimal_page_stats_iterator { + ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => { + struct $iterator_type<'a, I> + where + I: Iterator, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator, + { + fn new(iter: I) -> Self { + Self { iter } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator, + { + type Item = Vec>; + + fn next(&mut self) -> Option { + let next = self.iter.next(); + match next { + Some((len, index)) => match index { + Index::INT32(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| { + Some($stat_value_type::from( + x.$func.unwrap_or_default(), + )) + }) + .collect::>(), + ), + Index::INT64(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| { + Some($stat_value_type::from( + x.$func.unwrap_or_default(), + )) + }) + .collect::>(), + ), + Index::BYTE_ARRAY(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| { + Some($convert_func( + x.clone().$func.unwrap_or_default().data(), + )) + }) + .collect::>(), + ), + Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some( + native_index + .indexes + .iter() + .map(|x| { + Some($convert_func( + x.clone().$func.unwrap_or_default().data(), + )) + }) + .collect::>(), + ), + _ => Some(vec![None; len]), + }, + _ => None, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + } + }; +} + +get_decimal_page_stats_iterator!( + MinDecimal128DataPageStatsIterator, + min, + i128, + from_bytes_to_i128 +); + +get_decimal_page_stats_iterator!( + MaxDecimal128DataPageStatsIterator, + max, + i128, + from_bytes_to_i128 +); + +get_decimal_page_stats_iterator!( + MinDecimal256DataPageStatsIterator, + min, + i256, + from_bytes_to_i256 +); + +get_decimal_page_stats_iterator!( + MaxDecimal256DataPageStatsIterator, + max, + i256, + from_bytes_to_i256 +); make_data_page_stats_iterator!( MinByteArrayDataPageStatsIterator, |x: &PageIndex| { x.min.clone() }, @@ -612,6 +725,7 @@ make_data_page_stats_iterator!( Index::BYTE_ARRAY, ByteArray ); + macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { paste! { @@ -755,6 +869,10 @@ macro_rules! get_data_page_statistics { ) ) ), + Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new( + Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), + Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new( + Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), _ => unimplemented!() } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index eba9687f04a9..47f079063d3c 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -1683,7 +1683,7 @@ async fn test_decimal() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "decimal_col", - check: Check::RowGroup, + check: Check::Both, } .run(); } @@ -1721,7 +1721,7 @@ async fn test_decimal_256() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "decimal256_col", - check: Check::RowGroup, + check: Check::Both, } .run(); }