Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ impl Field {
let write_batch_expr = quote! {
let mut vals = Vec::new();
if let #column_reader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please also update the documentation above

    /// `Option` types and references not supported

To reflect this change?

Copy link
Contributor Author

@double-free double-free Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can make it explicit, but to align our understanding, I want to note that this change does NOT mean it supports Option<T> in the struct to be decoded.
It is to support fields that are marked as "optional" in schema, but in fact all values are non-null. This is very common when parsing parquet files generated by pyarrow, since by default all columns are optional. I reproduced it in unit test, by adding Option<T> in the struct to be encoded.

So this comment is still true, but I can make it explicit for the case I mentioned above:

    /// `Option` types and references not supported, but the column itself can be nullable (i.e., def_level==1), as long as the values are all valid.

How do you think? To update the comment or keep it as it is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{#ident});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{#ident});
}
Expand Down Expand Up @@ -876,15 +882,21 @@ mod test {
snippet,
(quote! {
{
let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ counter });
}
for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
r.counter = vals[i] as usize;
}
let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{counter});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{counter});
}
for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
r.counter = vals[i] as usize;
}
}
})
.to_string()
Expand Down Expand Up @@ -1291,7 +1303,13 @@ mod test {
{
let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{henceforth});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
}
Expand Down Expand Up @@ -1359,7 +1377,13 @@ mod test {
{
let mut vals = Vec::new();
if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{henceforth});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
}
Expand Down Expand Up @@ -1427,7 +1451,13 @@ mod test {
{
let mut vals = Vec::new();
if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{unique_id});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
}
Expand Down
58 changes: 58 additions & 0 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ struct APartiallyCompleteRecord {
pub byte_vec: Vec<u8>,
}

// This struct has OPTIONAL columns
// If these fields are guaranteed to be valid
// we can load this struct into APartiallyCompleteRecord
#[derive(PartialEq, ParquetRecordWriter, Debug)]
struct APartiallyOptionalRecord {
pub bool: bool,
pub string: String,
pub maybe_i16: Option<i16>,
pub maybe_i32: Option<i32>,
pub maybe_u64: Option<u64>,
pub isize: isize,
pub float: f32,
pub double: f64,
pub now: chrono::NaiveDateTime,
pub date: chrono::NaiveDate,
pub byte_vec: Vec<u8>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -218,6 +236,46 @@ mod tests {
assert_eq!(drs[0], out[0]);
}

#[test]
fn test_parquet_derive_read_optional_but_valid_column() {
let file = get_temp_file("test_parquet_derive_read_optional", &[]);
let drs: Vec<APartiallyOptionalRecord> = vec![APartiallyOptionalRecord {
bool: true,
string: "a string".into(),
maybe_i16: Some(-45),
maybe_i32: Some(456),
maybe_u64: Some(4563424),
isize: -365,
float: 3.5,
double: std::f64::NAN,
now: chrono::Utc::now().naive_local(),
date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
byte_vec: vec![0x65, 0x66, 0x67],
}];

let generated_schema = drs.as_slice().schema().unwrap();

let props = Default::default();
let mut writer =
SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap();

let mut row_group = writer.next_row_group().unwrap();
drs.as_slice().write_to_row_group(&mut row_group).unwrap();
row_group.close().unwrap();
writer.close().unwrap();

use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader};
let reader = SerializedFileReader::new(file).unwrap();
let mut out: Vec<APartiallyCompleteRecord> = Vec::new();

let mut row_group = reader.get_row_group(0).unwrap();
out.read_from_row_group(&mut *row_group, 1).unwrap();

assert_eq!(drs[0].maybe_i16.unwrap(), out[0].i16);
assert_eq!(drs[0].maybe_i32.unwrap(), out[0].i32);
assert_eq!(drs[0].maybe_u64.unwrap(), out[0].u64);
}

/// Returns file handle for a temp file in 'target' directory with a provided content
pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
// build tmp path to a file in "target/debug/testdata"
Expand Down