Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl ByteArrayDecoder {
validate_utf8,
)),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary(
ByteArrayDecoderDictionary::new(data, num_levels, num_values),
ByteArrayDecoderDictionary::new(data, num_levels, num_values)?,
),
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
Expand Down Expand Up @@ -563,10 +563,10 @@ pub struct ByteArrayDecoderDictionary {
}

impl ByteArrayDecoderDictionary {
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
Ok(Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this is a crate private structure, so this is not a public API change

decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
})
}

fn read<I: OffsetSizeTrait>(
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ where
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.slice(1..));
decoder.set_data(data.slice(1..))?;
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values: num_values.unwrap_or(num_levels),
Expand Down
10 changes: 5 additions & 5 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl ByteViewArrayDecoder {
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
data, num_levels, num_values,
))
)?)
}
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
Expand Down Expand Up @@ -426,10 +426,10 @@ pub struct ByteViewArrayDecoderDictionary {
}

impl ByteViewArrayDecoderDictionary {
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
Ok(Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
})
}

/// Reads the next indexes from self.decoder
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl ColumnValueDecoder for ValueDecoder {
offset: 0,
},
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
},
Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
decoder: DeltaByteArrayDecoder::new(data)?,
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/decoder/dictionary_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ pub struct DictIndexDecoder {
impl DictIndexDecoder {
/// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
/// associated with this data page, and the number of non-null values (if known)
pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.slice(1..));
decoder.set_data(data.slice(1..))?;

Self {
Ok(Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
}
})
}

/// Read up to `len` values, returning the number of values read
Expand Down
7 changes: 4 additions & 3 deletions parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ impl DefinitionLevelBufferDecoder {
impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
type Buffer = DefinitionLevelBuffer;

fn set_data(&mut self, encoding: Encoding, data: Bytes) {
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
match &mut self.decoder {
MaybePacked::Packed(d) => d.set_data(encoding, data),
MaybePacked::Fallback(d) => d.set_data(encoding, data),
}
MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
};
Ok(())
}
}

Expand Down
8 changes: 4 additions & 4 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ where
self.rep_level_decoder
.as_mut()
.unwrap()
.set_data(rep_level_encoding, level_data);
.set_data(rep_level_encoding, level_data)?;
}

if max_def_level > 0 {
Expand All @@ -466,7 +466,7 @@ where
self.def_level_decoder
.as_mut()
.unwrap()
.set_data(def_level_encoding, level_data);
.set_data(def_level_encoding, level_data)?;
}

self.values_decoder.set_data(
Expand Down Expand Up @@ -512,7 +512,7 @@ where
self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.slice(..rep_levels_byte_len as usize),
);
)?;
}

// DataPage v2 only supports RLE encoding for definition
Expand All @@ -524,7 +524,7 @@ where
rep_levels_byte_len as usize
..(rep_levels_byte_len + def_levels_byte_len) as usize,
),
);
)?;
}

self.values_decoder.set_data(
Expand Down
26 changes: 14 additions & 12 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait ColumnLevelDecoder {
type Buffer;

/// Set data for this [`ColumnLevelDecoder`]
fn set_data(&mut self, encoding: Encoding, data: Bytes);
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>;
}

pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
Expand Down Expand Up @@ -266,15 +266,15 @@ enum LevelDecoder {
}

impl LevelDecoder {
fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result<Self> {
match encoding {
Encoding::RLE => {
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data);
Self::Rle(decoder)
decoder.set_data(data)?;
Ok(Self::Rle(decoder))
}
#[allow(deprecated)]
Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data), bit_width)),
_ => unreachable!("invalid level encoding: {}", encoding),
}
}
Expand Down Expand Up @@ -310,8 +310,9 @@ impl DefinitionLevelDecoderImpl {
impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
type Buffer = Vec<i16>;

fn set_data(&mut self, encoding: Encoding, data: Bytes) {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
Ok(())
}
}

Expand Down Expand Up @@ -413,10 +414,11 @@ impl RepetitionLevelDecoderImpl {
impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
type Buffer = Vec<i16>;

fn set_data(&mut self, encoding: Encoding, data: Bytes) {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
self.buffer_len = 0;
self.buffer_offset = 0;
Ok(())
}
}

Expand Down Expand Up @@ -499,14 +501,14 @@ mod tests {
let data = Bytes::from(encoder.consume());

let mut decoder = RepetitionLevelDecoderImpl::new(1);
decoder.set_data(Encoding::RLE, data.clone());
decoder.set_data(Encoding::RLE, data.clone()).unwrap();
let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
assert_eq!(levels, 4);

// The length of the final bit packed run is ambiguous, so without the correct
// levels limit, it will decode zero padding
let mut decoder = RepetitionLevelDecoderImpl::new(1);
decoder.set_data(Encoding::RLE, data);
decoder.set_data(Encoding::RLE, data).unwrap();
let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
assert_eq!(levels, 6);
}
Expand All @@ -525,7 +527,7 @@ mod tests {
let data = Bytes::from(encoder.consume());

let mut decoder = RepetitionLevelDecoderImpl::new(5);
decoder.set_data(Encoding::RLE, data);
decoder.set_data(Encoding::RLE, data).unwrap();

let total_records = encoded.iter().filter(|x| **x == 0).count();
let mut remaining_records = total_records;
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl<T: DataType> Decoder<T> for DictDecoder<T> {
));
}
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(data.slice(1..));
rle_decoder.set_data(data.slice(1..))?;
self.num_values = num_values;
self.rle_decoder = Some(rle_decoder);
Ok(())
Expand Down Expand Up @@ -473,7 +473,7 @@ impl<T: DataType> Decoder<T> for RleValueDecoder<T> {

self.decoder = RleDecoder::new(1);
self.decoder
.set_data(data.slice(I32_SIZE..I32_SIZE + data_size));
.set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
self.values_left = num_values;
Ok(())
}
Expand Down
Loading
Loading