Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/src/arrow/buffer/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use std::ops::Range;

#[allow(unused)]
/// Counts the number of set bits in the provided range
pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - range.start);
Expand Down
57 changes: 41 additions & 16 deletions parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use arrow_buffer::Buffer;
use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use bytes::Bytes;

use crate::arrow::buffer::bit_util::count_set_bits;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
Expand Down Expand Up @@ -169,10 +168,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
(BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
assert_eq!(self.max_level, 1);

let start = nulls.len();
let levels_read = decoder.read(nulls, num_levels)?;

let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read);
let (values_read, levels_read) = decoder.read(nulls, num_levels)?;
Ok((values_read, levels_read))
}
_ => unreachable!("inconsistent null mask"),
Expand Down Expand Up @@ -284,20 +280,31 @@ impl PackedDecoder {
self.data_offset = 0;
}

fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<usize> {
let mut read = 0;
while read != len {
/// Reads up to `len` definition levels directly into a boolean bitmask.
///
/// Returns a tuple of `(values_read, levels_read)`, where `values_read` counts the
/// number of `true` bits appended to `buffer`.
fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) -> Result<(usize, usize)> {
let mut levels_read = 0;
let mut values_read = 0;
while levels_read != len {
if self.rle_left != 0 {
let to_read = self.rle_left.min(len - read);
let to_read = self.rle_left.min(len - levels_read);
buffer.append_n(to_read, self.rle_value);
self.rle_left -= to_read;
read += to_read;
if self.rle_value {
values_read += to_read;
}
levels_read += to_read;
} else if self.packed_count != self.packed_offset {
let to_read = (self.packed_count - self.packed_offset).min(len - read);
let to_read = (self.packed_count - self.packed_offset).min(len - levels_read);
let offset = self.data_offset * 8 + self.packed_offset;
buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
// Packed runs already encode bits densely; count the ones we just appended.
values_read +=
UnalignedBitChunk::new(self.data.as_ref(), offset, to_read).count_ones();
self.packed_offset += to_read;
read += to_read;
levels_read += to_read;

if self.packed_offset == self.packed_count {
self.data_offset += self.packed_count / 8;
Expand All @@ -308,7 +315,7 @@ impl PackedDecoder {
self.next_rle_block()?
}
}
Ok(read)
Ok((values_read, levels_read))
}

/// Skips `level_num` definition levels
Expand Down Expand Up @@ -360,10 +367,14 @@ mod tests {

let mut expected = BooleanBufferBuilder::new(len);
let mut encoder = RleEncoder::new(1, 1024);
let mut expected_value_count = 0;
for _ in 0..len {
let bool = rng.random_bool(0.8);
encoder.put(bool as u64);
expected.append(bool);
if bool {
expected_value_count += 1;
}
}
assert_eq!(expected.len(), len);

Expand All @@ -373,18 +384,27 @@ mod tests {

// Decode data in random length intervals
let mut decoded = BooleanBufferBuilder::new(len);
// Track how many `true` bits we appended to validate the returned counts.
let mut decoded_value_count = 0;
loop {
let remaining = len - decoded.len();
if remaining == 0 {
break;
}

let to_read = rng.random_range(1..=remaining);
decoder.read(&mut decoded, to_read).unwrap();
let offset = decoded.len();
let (values_read, levels_read) = decoder.read(&mut decoded, to_read).unwrap();
assert_eq!(levels_read, to_read);
decoded_value_count += values_read;
let expected_chunk_ones =
UnalignedBitChunk::new(expected.as_slice(), offset, levels_read).count_ones();
assert_eq!(values_read, expected_chunk_ones);
}

assert_eq!(decoded.len(), len);
assert_eq!(decoded.as_slice(), expected.as_slice());
assert_eq!(decoded_value_count, expected_value_count);
}

#[test]
Expand Down Expand Up @@ -428,18 +448,23 @@ mod tests {
skip_level += skip_level_num
} else {
let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
let read_level_num = decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
let (read_value_num, read_level_num) =
decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
read_level += read_level_num;
read_value += read_value_num;
// Verify the per-chunk counts match the exact bits we compared below.
let mut chunk_value_count = 0;
for i in 0..read_level_num {
assert!(!decoded.is_empty());
//check each read bit
let read_bit = decoded.get_bit(i);
if read_bit {
read_value += 1;
chunk_value_count += 1;
}
let expect_bit = expected.get_bit(i + offset);
assert_eq!(read_bit, expect_bit);
}
assert_eq!(chunk_value_count, read_value_num);
}
}
assert_eq!(read_level + skip_level, len);
Expand Down
102 changes: 78 additions & 24 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use bytes::Bytes;

use crate::basic::Encoding;
Expand Down Expand Up @@ -68,9 +66,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
}

pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
/// Read up to `num_levels` definition levels into `out`
/// Read up to `num_levels` definition levels into `out`.
///
/// Returns the number of values skipped, and the number of levels skipped
/// Returns the number of values read, and the number of levels read.
///
/// # Panics
///
Expand All @@ -81,9 +79,9 @@ pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
num_levels: usize,
) -> Result<(usize, usize)>;

/// Skips over `num_levels` definition levels
/// Skips over `num_levels` definition levels.
///
/// Returns the number of values skipped, and the number of levels skipped
/// Returns the number of values skipped, and the number of levels skipped.
fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
}

Expand Down Expand Up @@ -136,14 +134,75 @@ pub trait ColumnValueDecoder {
fn skip_values(&mut self, num_values: usize) -> Result<usize>;
}

/// Bucket-based storage for decoder instances keyed by `Encoding`.
///
/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the
/// hot decoding paths.
const ENCODING_SLOTS: usize = 10; // covers the encodings handled in `enc_slot`

#[inline]
fn enc_slot(e: Encoding) -> usize {
match e {
Encoding::PLAIN => 0,
Encoding::PLAIN_DICTIONARY => 2,
Encoding::RLE => 3,
#[allow(deprecated)]
Encoding::BIT_PACKED => 4,
Encoding::DELTA_BINARY_PACKED => 5,
Encoding::DELTA_LENGTH_BYTE_ARRAY => 6,
Encoding::DELTA_BYTE_ARRAY => 7,
Encoding::RLE_DICTIONARY => 8,
Encoding::BYTE_STREAM_SPLIT => 9,
}
}

/// Fixed-capacity storage for decoder instances keyed by Parquet encoding.
struct DecoderBuckets<V> {
inner: [Option<V>; ENCODING_SLOTS],
}

impl<V> DecoderBuckets<V> {
#[inline]
fn new() -> Self {
Self {
inner: std::array::from_fn(|_| None),
}
}

#[inline]
fn contains_key(&self, e: Encoding) -> bool {
self.inner[enc_slot(e)].is_some()
}

#[inline]
fn get_mut(&mut self, e: Encoding) -> Option<&mut V> {
self.inner[enc_slot(e)].as_mut()
}

#[inline]
fn insert_and_get_mut(&mut self, e: Encoding, v: V) -> &mut V {
let slot = &mut self.inner[enc_slot(e)];
debug_assert!(slot.is_none());
*slot = Some(v);
slot.as_mut().unwrap()
}
}

impl<V> Default for DecoderBuckets<V> {
fn default() -> Self {
Self::new()
}
}

/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
pub struct ColumnValueDecoderImpl<T: DataType> {
descr: ColumnDescPtr,

current_encoding: Option<Encoding>,

// Cache of decoders for existing encodings
decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
/// Cache of decoders for existing encodings.
/// Uses `DecoderBuckets` instead of `HashMap` for predictable indexing.
decoders: DecoderBuckets<Box<dyn Decoder<T>>>,
}

impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
Expand All @@ -153,7 +212,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
Self {
descr: descr.clone(),
current_encoding: None,
decoders: Default::default(),
decoders: DecoderBuckets::new(),
}
}

Expand All @@ -168,7 +227,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
encoding = Encoding::RLE_DICTIONARY
}

if self.decoders.contains_key(&encoding) {
if self.decoders.contains_key(encoding) {
return Err(general_err!("Column cannot have more than one dictionary"));
}

Expand All @@ -178,7 +237,8 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let mut decoder = DictDecoder::new();
decoder.set_dict(Box::new(dictionary))?;
self.decoders.insert(encoding, Box::new(decoder));
self.decoders
.insert_and_get_mut(encoding, Box::new(decoder));
Ok(())
} else {
Err(nyi_err!(
Expand All @@ -195,25 +255,19 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
num_levels: usize,
num_values: Option<usize>,
) -> Result<()> {
use std::collections::hash_map::Entry;

if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}

let decoder = if encoding == Encoding::RLE_DICTIONARY {
self.decoders
.get_mut(&encoding)
.get_mut(encoding)
.expect("Decoder for dict should have been set")
} else if let Some(decoder) = self.decoders.get_mut(encoding) {
decoder
} else {
// Search cache for data page decoder
match self.decoders.entry(encoding) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(v) => {
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
v.insert(data_decoder)
}
}
let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
self.decoders.insert_and_get_mut(encoding, data_decoder)
};

decoder.set_data(data, num_values.unwrap_or(num_levels))?;
Expand All @@ -228,7 +282,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let current_decoder = self
.decoders
.get_mut(&encoding)
.get_mut(encoding)
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));

// TODO: Push vec into decoder (#5177)
Expand All @@ -246,7 +300,7 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {

let current_decoder = self
.decoders
.get_mut(&encoding)
.get_mut(encoding)
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));

current_decoder.skip(num_values)
Expand Down
Loading