Skip to content

Commit bdea76c

Browse files
xinlifoobarappletreeisyellow
authored andcommitted
Update ListingTable to use StatisticsConverter (apache#11068)
* Update ListingTable to use StatisticsConverter * complete support for all types parquet * Fix misc * Fix misc * fix test * fix misc * fix none
1 parent f3149b1 commit bdea76c

File tree

1 file changed

+103
-151
lines changed
  • datafusion/core/src/datasource/file_format

1 file changed

+103
-151
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 103 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,20 @@ use std::sync::Arc;
2525
use super::write::demux::start_demuxer_task;
2626
use super::write::{create_writer, SharedBuffer};
2727
use super::{FileFormat, FileScanConfig};
28-
use crate::arrow::array::{
29-
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
30-
};
31-
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
28+
use crate::arrow::array::RecordBatch;
29+
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
3230
use crate::datasource::file_format::file_compression_type::FileCompressionType;
3331
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
34-
use crate::datasource::schema_adapter::{
35-
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
36-
};
3732
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
3833
use crate::error::Result;
3934
use crate::execution::context::SessionState;
40-
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
4135
use crate::physical_plan::insert::{DataSink, DataSinkExec};
4236
use crate::physical_plan::{
4337
Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
4438
Statistics,
4539
};
4640

41+
use arrow::compute::sum;
4742
use datafusion_common::config::TableParquetOptions;
4843
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
4944
use datafusion_common::stats::Precision;
@@ -52,11 +47,13 @@ use datafusion_common::{
5247
};
5348
use datafusion_common_runtime::SpawnedTask;
5449
use datafusion_execution::TaskContext;
50+
use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator};
5551
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
5652
use datafusion_physical_plan::metrics::MetricsSet;
5753

5854
use async_trait::async_trait;
5955
use bytes::{BufMut, BytesMut};
56+
use log::debug;
6057
use object_store::buffered::BufWriter;
6158
use parquet::arrow::arrow_writer::{
6259
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
@@ -66,16 +63,17 @@ use parquet::arrow::{
6663
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
6764
};
6865
use parquet::file::footer::{decode_footer, decode_metadata};
69-
use parquet::file::metadata::ParquetMetaData;
66+
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
7067
use parquet::file::properties::WriterProperties;
71-
use parquet::file::statistics::Statistics as ParquetStatistics;
7268
use parquet::file::writer::SerializedFileWriter;
7369
use parquet::format::FileMetaData;
7470
use tokio::io::{AsyncWrite, AsyncWriteExt};
7571
use tokio::sync::mpsc::{self, Receiver, Sender};
7672
use tokio::task::JoinSet;
7773

78-
use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
74+
use crate::datasource::physical_plan::parquet::{
75+
ParquetExecBuilder, StatisticsConverter,
76+
};
7977
use futures::{StreamExt, TryStreamExt};
8078
use hashbrown::HashMap;
8179
use object_store::path::Path;
@@ -295,86 +293,6 @@ impl FileFormat for ParquetFormat {
295293
}
296294
}
297295

298-
fn summarize_min_max(
299-
max_values: &mut [Option<MaxAccumulator>],
300-
min_values: &mut [Option<MinAccumulator>],
301-
fields: &Fields,
302-
i: usize,
303-
stat: &ParquetStatistics,
304-
) {
305-
if !stat.has_min_max_set() {
306-
max_values[i] = None;
307-
min_values[i] = None;
308-
return;
309-
}
310-
match stat {
311-
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => {
312-
if let Some(max_value) = &mut max_values[i] {
313-
max_value
314-
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
315-
.unwrap_or_else(|_| max_values[i] = None);
316-
}
317-
if let Some(min_value) = &mut min_values[i] {
318-
min_value
319-
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
320-
.unwrap_or_else(|_| min_values[i] = None);
321-
}
322-
}
323-
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => {
324-
if let Some(max_value) = &mut max_values[i] {
325-
max_value
326-
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))])
327-
.unwrap_or_else(|_| max_values[i] = None);
328-
}
329-
if let Some(min_value) = &mut min_values[i] {
330-
min_value
331-
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))])
332-
.unwrap_or_else(|_| min_values[i] = None);
333-
}
334-
}
335-
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => {
336-
if let Some(max_value) = &mut max_values[i] {
337-
max_value
338-
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))])
339-
.unwrap_or_else(|_| max_values[i] = None);
340-
}
341-
if let Some(min_value) = &mut min_values[i] {
342-
min_value
343-
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))])
344-
.unwrap_or_else(|_| min_values[i] = None);
345-
}
346-
}
347-
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => {
348-
if let Some(max_value) = &mut max_values[i] {
349-
max_value
350-
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))])
351-
.unwrap_or_else(|_| max_values[i] = None);
352-
}
353-
if let Some(min_value) = &mut min_values[i] {
354-
min_value
355-
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))])
356-
.unwrap_or_else(|_| min_values[i] = None);
357-
}
358-
}
359-
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => {
360-
if let Some(max_value) = &mut max_values[i] {
361-
max_value
362-
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))])
363-
.unwrap_or_else(|_| max_values[i] = None);
364-
}
365-
if let Some(min_value) = &mut min_values[i] {
366-
min_value
367-
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))])
368-
.unwrap_or_else(|_| min_values[i] = None);
369-
}
370-
}
371-
_ => {
372-
max_values[i] = None;
373-
min_values[i] = None;
374-
}
375-
}
376-
}
377-
378296
/// Fetches parquet metadata from ObjectStore for given object
379297
///
380298
/// This component is a subject to **change** in near future and is exposed for low level integrations
@@ -467,88 +385,115 @@ async fn fetch_statistics(
467385
statistics_from_parquet_meta(&metadata, table_schema).await
468386
}
469387

470-
/// Convert statistics in [`ParquetMetaData`] into [`Statistics`]
388+
/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`]
471389
///
472390
/// The statistics are calculated for each column in the table schema
473391
/// using the row group statistics in the parquet metadata.
474392
pub async fn statistics_from_parquet_meta(
475393
metadata: &ParquetMetaData,
476394
table_schema: SchemaRef,
477395
) -> Result<Statistics> {
478-
let file_metadata = metadata.file_metadata();
396+
let row_groups_metadata = metadata.row_groups();
479397

398+
let mut statistics = Statistics::new_unknown(&table_schema);
399+
let mut has_statistics = false;
400+
let mut num_rows = 0_usize;
401+
let mut total_byte_size = 0_usize;
402+
for row_group_meta in row_groups_metadata {
403+
num_rows += row_group_meta.num_rows() as usize;
404+
total_byte_size += row_group_meta.total_byte_size() as usize;
405+
406+
if !has_statistics {
407+
row_group_meta.columns().iter().for_each(|column| {
408+
has_statistics = column.statistics().is_some();
409+
});
410+
}
411+
}
412+
statistics.num_rows = Precision::Exact(num_rows);
413+
statistics.total_byte_size = Precision::Exact(total_byte_size);
414+
415+
let file_metadata = metadata.file_metadata();
480416
let file_schema = parquet_to_arrow_schema(
481417
file_metadata.schema_descr(),
482418
file_metadata.key_value_metadata(),
483419
)?;
484420

485-
let num_fields = table_schema.fields().len();
486-
let fields = table_schema.fields();
487-
488-
let mut num_rows = 0;
489-
let mut total_byte_size = 0;
490-
let mut null_counts = vec![Precision::Exact(0); num_fields];
491-
let mut has_statistics = false;
492-
493-
let schema_adapter =
494-
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
421+
statistics.column_statistics = if has_statistics {
422+
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
423+
let mut null_counts_array =
424+
vec![Precision::Exact(0); table_schema.fields().len()];
495425

496-
let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
497-
498-
for row_group_meta in metadata.row_groups() {
499-
num_rows += row_group_meta.num_rows();
500-
total_byte_size += row_group_meta.total_byte_size();
501-
502-
let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();
503-
504-
for (i, column) in row_group_meta.columns().iter().enumerate() {
505-
if let Some(stat) = column.statistics() {
506-
has_statistics = true;
507-
column_stats.insert(i, (stat.null_count(), stat));
508-
}
509-
}
510-
511-
if has_statistics {
512-
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
513-
if let Some(file_idx) =
514-
schema_adapter.map_column_index(table_idx, &file_schema)
515-
{
516-
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
517-
*null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize));
518-
summarize_min_max(
519-
&mut max_values,
520-
&mut min_values,
521-
fields,
522-
table_idx,
523-
stats,
426+
table_schema
427+
.fields()
428+
.iter()
429+
.enumerate()
430+
.for_each(|(idx, field)| {
431+
match StatisticsConverter::try_new(
432+
field.name(),
433+
&file_schema,
434+
file_metadata.schema_descr(),
435+
) {
436+
Ok(stats_converter) => {
437+
summarize_min_max_null_counts(
438+
&mut min_accs,
439+
&mut max_accs,
440+
&mut null_counts_array,
441+
idx,
442+
num_rows,
443+
&stats_converter,
444+
row_groups_metadata,
524445
)
525-
} else {
526-
// If none statistics of current column exists, set the Max/Min Accumulator to None.
527-
max_values[table_idx] = None;
528-
min_values[table_idx] = None;
446+
.ok();
447+
}
448+
Err(e) => {
449+
debug!("Failed to create statistics converter: {}", e);
450+
null_counts_array[idx] = Precision::Exact(num_rows);
529451
}
530-
} else {
531-
*null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize));
532452
}
533-
}
534-
}
535-
}
453+
});
536454

537-
let column_stats = if has_statistics {
538-
get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values)
455+
get_col_stats(
456+
&table_schema,
457+
null_counts_array,
458+
&mut max_accs,
459+
&mut min_accs,
460+
)
539461
} else {
540462
Statistics::unknown_column(&table_schema)
541463
};
542464

543-
let statistics = Statistics {
544-
num_rows: Precision::Exact(num_rows as usize),
545-
total_byte_size: Precision::Exact(total_byte_size as usize),
546-
column_statistics: column_stats,
547-
};
548-
549465
Ok(statistics)
550466
}
551467

468+
fn summarize_min_max_null_counts(
469+
min_accs: &mut [Option<MinAccumulator>],
470+
max_accs: &mut [Option<MaxAccumulator>],
471+
null_counts_array: &mut [Precision<usize>],
472+
arrow_schema_index: usize,
473+
num_rows: usize,
474+
stats_converter: &StatisticsConverter,
475+
row_groups_metadata: &[RowGroupMetaData],
476+
) -> Result<()> {
477+
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
478+
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
479+
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
480+
481+
if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
482+
max_acc.update_batch(&[max_values])?;
483+
}
484+
485+
if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
486+
min_acc.update_batch(&[min_values])?;
487+
}
488+
489+
null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
490+
Some(null_count) => null_count as usize,
491+
None => num_rows,
492+
});
493+
494+
Ok(())
495+
}
496+
552497
/// Implements [`DataSink`] for writing to a parquet file.
553498
pub struct ParquetSink {
554499
/// Config options for writing data
@@ -1126,7 +1071,8 @@ mod tests {
11261071
use crate::physical_plan::metrics::MetricValue;
11271072
use crate::prelude::{SessionConfig, SessionContext};
11281073
use arrow::array::{Array, ArrayRef, StringArray};
1129-
use arrow_schema::Field;
1074+
use arrow_array::Int64Array;
1075+
use arrow_schema::{DataType, Field};
11301076
use async_trait::async_trait;
11311077
use datafusion_common::cast::{
11321078
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
@@ -1449,8 +1395,14 @@ mod tests {
14491395
// column c1
14501396
let c1_stats = &stats.column_statistics[0];
14511397
assert_eq!(c1_stats.null_count, Precision::Exact(1));
1452-
assert_eq!(c1_stats.max_value, Precision::Absent);
1453-
assert_eq!(c1_stats.min_value, Precision::Absent);
1398+
assert_eq!(
1399+
c1_stats.max_value,
1400+
Precision::Exact(ScalarValue::Utf8(Some("bar".to_string())))
1401+
);
1402+
assert_eq!(
1403+
c1_stats.min_value,
1404+
Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string())))
1405+
);
14541406
// column c2: missing from the file so the table treats all 3 rows as null
14551407
let c2_stats = &stats.column_statistics[1];
14561408
assert_eq!(c2_stats.null_count, Precision::Exact(3));

0 commit comments

Comments
 (0)