Skip to content

Commit 10948ca

Browse files
Lordwormsalamb
andauthored
Support Decimal and Decimal256 Parquet Data Page Statistics (#11138)
Co-authored-by: Andrew Lamb <[email protected]>
1 parent 64b8eea commit 10948ca

File tree

2 files changed

+120
-2
lines changed

2 files changed

+120
-2
lines changed

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,119 @@ make_data_page_stats_iterator!(
600600
Index::DOUBLE,
601601
f64
602602
);
603+
604+
macro_rules! get_decimal_page_stats_iterator {
605+
($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
606+
struct $iterator_type<'a, I>
607+
where
608+
I: Iterator<Item = (usize, &'a Index)>,
609+
{
610+
iter: I,
611+
}
612+
613+
impl<'a, I> $iterator_type<'a, I>
614+
where
615+
I: Iterator<Item = (usize, &'a Index)>,
616+
{
617+
fn new(iter: I) -> Self {
618+
Self { iter }
619+
}
620+
}
621+
622+
impl<'a, I> Iterator for $iterator_type<'a, I>
623+
where
624+
I: Iterator<Item = (usize, &'a Index)>,
625+
{
626+
type Item = Vec<Option<$stat_value_type>>;
627+
628+
fn next(&mut self) -> Option<Self::Item> {
629+
let next = self.iter.next();
630+
match next {
631+
Some((len, index)) => match index {
632+
Index::INT32(native_index) => Some(
633+
native_index
634+
.indexes
635+
.iter()
636+
.map(|x| {
637+
Some($stat_value_type::from(
638+
x.$func.unwrap_or_default(),
639+
))
640+
})
641+
.collect::<Vec<_>>(),
642+
),
643+
Index::INT64(native_index) => Some(
644+
native_index
645+
.indexes
646+
.iter()
647+
.map(|x| {
648+
Some($stat_value_type::from(
649+
x.$func.unwrap_or_default(),
650+
))
651+
})
652+
.collect::<Vec<_>>(),
653+
),
654+
Index::BYTE_ARRAY(native_index) => Some(
655+
native_index
656+
.indexes
657+
.iter()
658+
.map(|x| {
659+
Some($convert_func(
660+
x.clone().$func.unwrap_or_default().data(),
661+
))
662+
})
663+
.collect::<Vec<_>>(),
664+
),
665+
Index::FIXED_LEN_BYTE_ARRAY(native_index) => Some(
666+
native_index
667+
.indexes
668+
.iter()
669+
.map(|x| {
670+
Some($convert_func(
671+
x.clone().$func.unwrap_or_default().data(),
672+
))
673+
})
674+
.collect::<Vec<_>>(),
675+
),
676+
_ => Some(vec![None; len]),
677+
},
678+
_ => None,
679+
}
680+
}
681+
682+
fn size_hint(&self) -> (usize, Option<usize>) {
683+
self.iter.size_hint()
684+
}
685+
}
686+
};
687+
}
688+
689+
get_decimal_page_stats_iterator!(
690+
MinDecimal128DataPageStatsIterator,
691+
min,
692+
i128,
693+
from_bytes_to_i128
694+
);
695+
696+
get_decimal_page_stats_iterator!(
697+
MaxDecimal128DataPageStatsIterator,
698+
max,
699+
i128,
700+
from_bytes_to_i128
701+
);
702+
703+
get_decimal_page_stats_iterator!(
704+
MinDecimal256DataPageStatsIterator,
705+
min,
706+
i256,
707+
from_bytes_to_i256
708+
);
709+
710+
get_decimal_page_stats_iterator!(
711+
MaxDecimal256DataPageStatsIterator,
712+
max,
713+
i256,
714+
from_bytes_to_i256
715+
);
603716
make_data_page_stats_iterator!(
604717
MinByteArrayDataPageStatsIterator,
605718
|x: &PageIndex<ByteArray>| { x.min.clone() },
@@ -612,6 +725,7 @@ make_data_page_stats_iterator!(
612725
Index::BYTE_ARRAY,
613726
ByteArray
614727
);
728+
615729
macro_rules! get_data_page_statistics {
616730
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
617731
paste! {
@@ -755,6 +869,10 @@ macro_rules! get_data_page_statistics {
755869
)
756870
)
757871
),
872+
Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new(
873+
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
874+
Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new(
875+
Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
758876
_ => unimplemented!()
759877
}
760878
}

datafusion/core/tests/parquet/arrow_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,7 +1683,7 @@ async fn test_decimal() {
16831683
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
16841684
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
16851685
column_name: "decimal_col",
1686-
check: Check::RowGroup,
1686+
check: Check::Both,
16871687
}
16881688
.run();
16891689
}
@@ -1721,7 +1721,7 @@ async fn test_decimal_256() {
17211721
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
17221722
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
17231723
column_name: "decimal256_col",
1724-
check: Check::RowGroup,
1724+
check: Check::Both,
17251725
}
17261726
.run();
17271727
}

0 commit comments

Comments
 (0)