Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion arrow-row/src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::fixed::{FixedLengthEncoding, FromSlice};
use crate::interner::{Interned, OrderPreservingInterner};
use crate::{null_sentinel, Rows};
use crate::{null_sentinel, Row, Rows};
use arrow_array::builder::*;
use arrow_array::cast::*;
use arrow_array::types::*;
Expand Down Expand Up @@ -56,6 +56,24 @@ pub fn compute_dictionary_mapping(
}
}

/// Encode dictionary values not preserving the dictionary encoding
pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
out: &mut Rows,
column: &DictionaryArray<K>,
values: &Rows,
null: &Row<'_>,
) {
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
let row = match k {
Some(k) => values.row(k.as_usize()).data,
None => null.data,
};
let end_offset = *offset + row.len();
out.buffer[*offset..end_offset].copy_from_slice(row);
*offset = end_offset;
}
}

/// Dictionary types are encoded as
///
/// - single `0_u8` if null
Expand Down
177 changes: 147 additions & 30 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ use arrow_schema::*;

use crate::dictionary::{
compute_dictionary_mapping, decode_dictionary, encode_dictionary,
encode_dictionary_values,
};
use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
use crate::interner::OrderPreservingInterner;
Expand Down Expand Up @@ -426,7 +427,14 @@ enum Codec {
/// No additional codec state is necessary
Stateless,
/// The interner used to encode dictionary values
///
/// Used when preserving the dictionary encoding
Dictionary(OrderPreservingInterner),
/// A row converter for the dictionary values
/// and the encoding of a row containing only nulls
///
/// Used when not preserving dictionary encoding
DictionaryValues(RowConverter, OwnedRow),
/// A row converter for the child fields
/// and the encoding of a row containing only nulls
Struct(RowConverter, OwnedRow),
Expand All @@ -437,7 +445,25 @@ enum Codec {
impl Codec {
fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
match &sort_field.data_type {
DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())),
DataType::Dictionary(_, values) => match sort_field.preserve_dictionaries {
true => Ok(Self::Dictionary(Default::default())),
false => {
let sort_field = SortField::new_with_options(
values.as_ref().clone(),
sort_field.options,
);

let mut converter = RowConverter::new(vec![sort_field])?;
let null_array = new_null_array(values.as_ref(), 1);
let nulls = converter.convert_columns(&[null_array])?;

let owned = OwnedRow {
data: nulls.buffer,
config: nulls.config,
};
Ok(Self::DictionaryValues(converter, owned))
}
},
d if !d.is_nested() => Ok(Self::Stateless),
DataType::List(f) | DataType::LargeList(f) => {
// The encoded contents will be inverted if descending is set to true
Expand Down Expand Up @@ -501,6 +527,15 @@ impl Codec {

Ok(Encoder::Dictionary(mapping))
}
Codec::DictionaryValues(converter, nulls) => {
let values = downcast_dictionary_array! {
array => array.values(),
_ => unreachable!()
};

let rows = converter.convert_columns(&[values.clone()])?;
Ok(Encoder::DictionaryValues(rows, nulls.row()))
}
Codec::Struct(converter, null) => {
let v = as_struct_array(array);
let rows = converter.convert_columns(v.columns())?;
Expand All @@ -522,6 +557,9 @@ impl Codec {
match self {
Codec::Stateless => 0,
Codec::Dictionary(interner) => interner.size(),
Codec::DictionaryValues(converter, nulls) => {
converter.size() + nulls.data.len()
}
Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
Codec::List(converter) => converter.size(),
}
Expand All @@ -534,6 +572,8 @@ enum Encoder<'a> {
Stateless,
/// The mapping from dictionary keys to normalized keys
Dictionary(Vec<Option<&'a [u8]>>),
/// The encoding of the child array and the encoding of a null row
DictionaryValues(Rows, Row<'a>),
/// The row encoding of the child arrays and the encoding of a null row
///
/// It is necessary to encode to a temporary [`Rows`] to avoid serializing
Expand All @@ -551,6 +591,8 @@ pub struct SortField {
options: SortOptions,
/// Data type
data_type: DataType,
/// Preserve dictionaries
preserve_dictionaries: bool,
}

impl SortField {
Expand All @@ -561,7 +603,28 @@ impl SortField {

/// Create a new column with the given data type and [`SortOptions`]
pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
Self { options, data_type }
Self {
options,
data_type,
preserve_dictionaries: true,
}
}

/// By default dictionaries are preserved as described on [`RowConverter`]
///
/// However, this process requires maintaining and incrementally updating
/// an order-preserving mapping of dictionary values which is relatively expensive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// an order-preserving mapping of dictionary values which is relatively expensive
/// an order-preserving mapping of dictionary values which is relatively expensive
/// computationally but minimizes memory used.

///
/// Some applications may wish to instead trade-off space efficiency, for improved
/// performance, by instead encoding dictionary values directly
///
/// When `preserve_dictionaries` is true, fields will instead be encoded as their
/// underlying value, reversing any dictionary encoding
pub fn preserve_dictionaries(self, preserve_dictionaries: bool) -> Self {
Self {
preserve_dictionaries,
..self
}
}

/// Return size of this instance in bytes.
Expand Down Expand Up @@ -1045,6 +1108,19 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) ->
_ => unreachable!(),
}
}
Encoder::DictionaryValues(values, null) => {
downcast_dictionary_array! {
array => {
for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
*length += match v {
Some(k) => values.row(k.as_usize()).data.len(),
None => null.data.len(),
}
}
}
_ => unreachable!(),
}
}
Encoder::Struct(rows, null) => {
let array = as_struct_array(array);
lengths.iter_mut().enumerate().for_each(|(idx, length)| {
Expand Down Expand Up @@ -1143,6 +1219,12 @@ fn encode_column(
_ => unreachable!()
}
}
Encoder::DictionaryValues(values, nulls) => {
downcast_dictionary_array! {
column => encode_dictionary_values(out, column, values, nulls),
_ => unreachable!()
}
}
Encoder::Struct(rows, null) => {
let array = as_struct_array(column);
let null_sentinel = null_sentinel(opts);
Expand Down Expand Up @@ -1221,6 +1303,10 @@ unsafe fn decode_column(
_ => unreachable!()
}
}
Codec::DictionaryValues(converter, _) => {
let cols = converter.convert_raw(rows, validate_utf8)?;
cols.into_iter().next().unwrap()
}
Codec::Struct(converter, _) => {
let (null_count, nulls) = fixed::decode_nulls(rows);
rows.iter_mut().for_each(|row| *row = &row[1..]);
Expand Down Expand Up @@ -1557,8 +1643,25 @@ mod tests {
assert_eq!(&cols[0], &col);
}

/// If `exact` is false performs a logical comparison between a and dictionary-encoded b
fn dictionary_eq(exact: bool, a: &dyn Array, b: &dyn Array) {
match b.data_type() {
DataType::Dictionary(_, v) if !exact => {
assert_eq!(a.data_type(), v.as_ref());
let b = arrow_cast::cast(b, v).unwrap();
assert_eq!(a.data(), b.data())
}
_ => assert_eq!(a.data(), b.data()),
}
}

#[test]
fn test_string_dictionary() {
test_string_dictionary_preserve(false);
test_string_dictionary_preserve(true);
}

fn test_string_dictionary_preserve(preserve: bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the repeated preserve was a little confusing at first -- as I would expect test_string_dictionary_preserve to mean testing preserve = true.

totally minor nitpick but maybe it could be renamed to test_string_dictionary

let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("foo"),
Some("hello"),
Expand All @@ -1570,8 +1673,8 @@ mod tests {
Some("hello"),
])) as ArrayRef;

let mut converter =
RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
let field = SortField::new(a.data_type().clone()).preserve_dictionaries(preserve);
let mut converter = RowConverter::new(vec![field]).unwrap();
let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();

assert!(rows_a.row(3) < rows_a.row(5));
Expand All @@ -1584,7 +1687,7 @@ mod tests {
assert_eq!(rows_a.row(1), rows_a.row(7));

let cols = converter.convert_rows(&rows_a).unwrap();
assert_eq!(&cols[0], &a);
dictionary_eq(preserve, &cols[0], &a);

let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
Some("hello"),
Expand All @@ -1598,15 +1701,16 @@ mod tests {
assert!(rows_b.row(2) < rows_a.row(0));

let cols = converter.convert_rows(&rows_b).unwrap();
assert_eq!(&cols[0], &b);
dictionary_eq(preserve, &cols[0], &b);

let mut converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
SortOptions {
descending: true,
nulls_first: false,
},
)])
)
.preserve_dictionaries(preserve)])
.unwrap();

let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
Expand All @@ -1616,15 +1720,16 @@ mod tests {
assert!(rows_c.row(3) > rows_c.row(0));

let cols = converter.convert_rows(&rows_c).unwrap();
assert_eq!(&cols[0], &a);
dictionary_eq(preserve, &cols[0], &a);

let mut converter = RowConverter::new(vec![SortField::new_with_options(
a.data_type().clone(),
SortOptions {
descending: true,
nulls_first: true,
},
)])
)
.preserve_dictionaries(preserve)])
.unwrap();

let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
Expand All @@ -1634,7 +1739,7 @@ mod tests {
assert!(rows_c.row(3) < rows_c.row(0));

let cols = converter.convert_rows(&rows_c).unwrap();
assert_eq!(&cols[0], &a);
dictionary_eq(preserve, &cols[0], &a);
}

#[test]
Expand Down Expand Up @@ -1694,15 +1799,19 @@ mod tests {
builder.append(-1).unwrap();

let a = builder.finish();

let mut converter =
RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::new(a)]).unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(2) < rows.row(0));
assert!(rows.row(3) < rows.row(2));
assert!(rows.row(6) < rows.row(2));
assert!(rows.row(3) < rows.row(6));
let data_type = a.data_type().clone();
let columns = [Arc::new(a) as ArrayRef];

for preserve in [true, false] {
let field = SortField::new(data_type.clone()).preserve_dictionaries(preserve);
let mut converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&columns).unwrap();
assert!(rows.row(0) < rows.row(1));
assert!(rows.row(2) < rows.row(0));
assert!(rows.row(3) < rows.row(2));
assert!(rows.row(6) < rows.row(2));
assert!(rows.row(3) < rows.row(6));
}
}

#[test]
Expand All @@ -1722,15 +1831,17 @@ mod tests {
.build()
.unwrap();

let mut converter = RowConverter::new(vec![SortField::new(data_type)]).unwrap();
let rows = converter
.convert_columns(&[Arc::new(DictionaryArray::<Int32Type>::from(data))])
.unwrap();
let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
for preserve in [true, false] {
let field = SortField::new(data_type.clone()).preserve_dictionaries(preserve);
let mut converter = RowConverter::new(vec![field]).unwrap();
let rows = converter.convert_columns(&columns).unwrap();

assert_eq!(rows.row(0), rows.row(1));
assert_eq!(rows.row(3), rows.row(4));
assert_eq!(rows.row(4), rows.row(5));
assert!(rows.row(3) < rows.row(0));
assert_eq!(rows.row(0), rows.row(1));
assert_eq!(rows.row(3), rows.row(4));
assert_eq!(rows.row(4), rows.row(5));
assert!(rows.row(3) < rows.row(0));
}
}

#[test]
Expand Down Expand Up @@ -2129,12 +2240,18 @@ mod tests {
})
.collect();

let preserve: Vec<_> = (0..num_columns).map(|_| rng.gen_bool(0.5)).collect();

let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();

let columns = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
.zip(&preserve)
.map(|((o, a), p)| {
SortField::new_with_options(a.data_type().clone(), o)
.preserve_dictionaries(*p)
})
.collect();

let mut converter = RowConverter::new(columns).unwrap();
Expand All @@ -2160,9 +2277,9 @@ mod tests {
}

let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
for ((actual, expected), preserve) in back.iter().zip(&arrays).zip(preserve) {
actual.data().validate_full().unwrap();
assert_eq!(actual, expected)
dictionary_eq(preserve, actual, expected)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these tests seem to verify that the values are the same. I wonder if it would be valuable to cover space efficiency too in a test (like encode using preserved and non preserved dictionaries and show them taking up different sizes 🤔 )

Expand Down
Loading