Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 120 additions & 95 deletions Cargo.lock

Large diffs are not rendered by default.

55 changes: 28 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ anyhow = "1.0.95"
arbitrary = "1.3.2"
arcref = "0.2.0"
arrayref = "0.3.7"
arrow-arith = "55.2.0"
arrow-array = "55.2.0"
arrow-buffer = "55.2.0"
arrow-cast = "55.2.0"
arrow-data = "55.2.0"
arrow-ipc = "55.2.0"
arrow-ord = "55.2.0"
arrow-schema = "55.2.0"
arrow-select = "55.2.0"
arrow-string = "55.2.0"
arrow-arith = "56"
arrow-array = "56"
arrow-buffer = "56"
arrow-cast = "56"
arrow-data = "56"
arrow-ipc = "56"
arrow-ord = "56"
arrow-schema = "56"
arrow-select = "56"
arrow-string = "56"
async-compat = "0.2.5"
async-stream = "0.3.6"
async-trait = "0.1.88"
async-trait = "0.1.89"
bindgen = "0.72.0"
bit-vec = "0.8.0"
bitvec = "1.0.1"
Expand All @@ -86,15 +86,17 @@ crossbeam-deque = "0.8.6"
crossbeam-queue = "0.3.12"
crossterm = "0.29"
dashmap = "6.1.0"
datafusion = { version = "49", default-features = false }
datafusion-catalog = { version = "49" }
datafusion-common = { version = "49" }
datafusion-common-runtime = { version = "49" }
datafusion-datasource = { version = "49", default-features = false }
datafusion-execution = { version = "49" }
datafusion-expr = { version = "49" }
datafusion-physical-expr = { version = "49" }
datafusion-physical-plan = { version = "49" }
datafusion = { version = "50", default-features = false }
datafusion-catalog = { version = "50" }
datafusion-common = { version = "50" }
datafusion-common-runtime = { version = "50" }
datafusion-datasource = { version = "50", default-features = false }
datafusion-execution = { version = "50" }
datafusion-expr = { version = "50" }
datafusion-physical-expr = { version = "50" }
datafusion-physical-expr-adapter = { version = "50" }
datafusion-physical-expr-common = { version = "50" }
datafusion-physical-plan = { version = "50" }
dirs = "6.0.0"
divan = { package = "codspeed-divan-compat", version = "3.0" }
dyn-hash = "0.2.0"
Expand Down Expand Up @@ -135,7 +137,7 @@ opentelemetry = "0.30.0"
opentelemetry-otlp = "0.30.0"
opentelemetry_sdk = "0.30.0"
parking_lot = { version = "0.12.3", features = ["nightly"] }
parquet = "55.2.0"
parquet = "56"
paste = "1.0.15"
pco = "0.4.4"
pin-project = "1.1.5"
Expand Down Expand Up @@ -181,17 +183,16 @@ target-lexicon = "0.13"
tempfile = "3"
termtree = { version = "0.5" }
thiserror = "2.0.3"
tokio = { version = "1.46" }
tokio = { version = "1.47" }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.16" }
# replace these with releases
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
tpchgen = { version = "2" }
tpchgen-arrow = { version = "2" }
tracing = { version = "0.1.41" }
tracing-perfetto = "0.1.5"
tracing-subscriber = "0.3.20"
url = "2.5.4"
uuid = { version = "1.17", features = ["js"] }
url = "2.5.7"
uuid = { version = "1.18", features = ["js"] }
walkdir = "2.5.0"
wasm-bindgen-futures = "0.4.39"
witchcraft-metrics = "1.0.1"
Expand Down
1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ datafusion = { workspace = true, features = [
"parquet",
"datetime_expressions",
"nested_expressions",
"unicode_expressions",
] }
datafusion-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
Expand Down
168 changes: 168 additions & 0 deletions vortex-array/src/arrow/compute/to_arrow/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use arrow_array::types::{
};
use arrow_array::{
Array, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray,
Decimal32Array as ArrowDecimal32Array, Decimal64Array as ArrowDecimal64Array,
Decimal128Array as ArrowDecimal128Array, Decimal256Array as ArrowDecimal256Array,
FixedSizeListArray as ArrowFixedSizeListArray, GenericByteArray, GenericByteViewArray,
GenericListArray, NullArray as ArrowNullArray, OffsetSizeTrait,
Expand Down Expand Up @@ -116,6 +117,34 @@ impl Kernel for ToArrowCanonical {
{
to_arrow_primitive::<Float64Type>(array)
}
(Canonical::Decimal(array), DataType::Decimal32(precision, scale)) => {
if array.decimal_dtype().precision() != *precision
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check too restrictive? Why not check whether the target type can hold at least the same precision and scale? Maybe I'm misreading something here.

array.decimal_dtype().precision() <= *precision
array.decimal_dtype().scale() <= *scale

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in bail if array.decimal_dtype().precision() > *precision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree, I'll think about it a bit more but probably include it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After spending some more time with with arrow's and datafusion's decimal-handling code, I think we shouldn't change precision and scale here. Keeping it this way guarantees predictable behavior and zero-copy, unless explicitly cast.

|| array.decimal_dtype().scale() != *scale
{
vortex_bail!(
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
precision,
scale,
array.decimal_dtype().precision(),
array.decimal_dtype().scale()
);
}
to_arrow_decimal32(array)
}
(Canonical::Decimal(array), DataType::Decimal64(precision, scale)) => {
if array.decimal_dtype().precision() != *precision
|| array.decimal_dtype().scale() != *scale
{
vortex_bail!(
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
precision,
scale,
array.decimal_dtype().precision(),
array.decimal_dtype().scale()
);
}
to_arrow_decimal64(array)
}
(Canonical::Decimal(array), DataType::Decimal128(precision, scale)) => {
if array.decimal_dtype().precision() != *precision
|| array.decimal_dtype().scale() != *scale
Expand Down Expand Up @@ -223,6 +252,91 @@ fn to_arrow_primitive<T: ArrowPrimitiveType>(array: PrimitiveArray) -> VortexRes
)))
}

fn to_arrow_decimal32(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
let null_buffer = array.validity_mask().to_null_buffer();
let buffer: Buffer<i32> = match array.values_type() {
DecimalValueType::I8 => {
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I16 => {
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I32 => array.buffer::<i32>(),
DecimalValueType::I64 => array
.buffer::<i64>()
.into_iter()
.map(|x| {
x.to_i32()
.ok_or_else(|| vortex_err!("i64 to i32 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
DecimalValueType::I128 => array
.buffer::<i128>()
.into_iter()
.map(|x| {
x.to_i32()
.ok_or_else(|| vortex_err!("i128 to i32 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
DecimalValueType::I256 => array
.buffer::<vortex_scalar::i256>()
.into_iter()
.map(|x| {
x.to_i32()
.ok_or_else(|| vortex_err!("i256 to i32 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
};
Ok(Arc::new(
ArrowDecimal32Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
.with_precision_and_scale(
array.decimal_dtype().precision(),
array.decimal_dtype().scale(),
)?,
))
}

fn to_arrow_decimal64(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
let null_buffer = array.validity_mask().to_null_buffer();
let buffer: Buffer<i64> = match array.values_type() {
DecimalValueType::I8 => {
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I16 => {
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I32 => {
Buffer::from_trusted_len_iter(array.buffer::<i32>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I64 => array.buffer::<i64>(),
DecimalValueType::I128 => array
.buffer::<i128>()
.into_iter()
.map(|x| {
x.to_i64()
.ok_or_else(|| vortex_err!("i128 to i64 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
DecimalValueType::I256 => array
.buffer::<vortex_scalar::i256>()
.into_iter()
.map(|x| {
x.to_i64()
.ok_or_else(|| vortex_err!("i256 to i64 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
};
Ok(Arc::new(
ArrowDecimal64Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
.with_precision_and_scale(
array.decimal_dtype().precision(),
array.decimal_dtype().scale(),
)?,
))
}

fn to_arrow_decimal128(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
let null_buffer = array.validity_mask().to_null_buffer();
let buffer: Buffer<i128> = match array.values_type() {
Expand Down Expand Up @@ -586,6 +700,60 @@ mod tests {
assert_eq!(arrow_decimal.value(2), 12);
}

#[rstest]
#[case(0i8)]
#[case(0i16)]
#[case(0i32)]
#[case(0i64)]
#[case(0i128)]
#[case(vortex_scalar::i256::ZERO)]
fn to_arrow_decimal32<T: NativeDecimalType>(#[case] _decimal_type: T) {
use arrow_array::Decimal32Array;

let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
decimal.append_value(10);
decimal.append_value(11);
decimal.append_value(12);

let decimal = decimal.finish();

let arrow_array = decimal.into_arrow(&DataType::Decimal32(2, 1)).unwrap();
let arrow_decimal = arrow_array
.as_any()
.downcast_ref::<Decimal32Array>()
.unwrap();
assert_eq!(arrow_decimal.value(0), 10);
assert_eq!(arrow_decimal.value(1), 11);
assert_eq!(arrow_decimal.value(2), 12);
}

#[rstest]
#[case(0i8)]
#[case(0i16)]
#[case(0i32)]
#[case(0i64)]
#[case(0i128)]
#[case(vortex_scalar::i256::ZERO)]
fn to_arrow_decimal64<T: NativeDecimalType>(#[case] _decimal_type: T) {
use arrow_array::Decimal64Array;

let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
decimal.append_value(10);
decimal.append_value(11);
decimal.append_value(12);

let decimal = decimal.finish();

let arrow_array = decimal.into_arrow(&DataType::Decimal64(2, 1)).unwrap();
let arrow_decimal = arrow_array
.as_any()
.downcast_ref::<Decimal64Array>()
.unwrap();
assert_eq!(arrow_decimal.value(0), 10);
assert_eq!(arrow_decimal.value(1), 11);
assert_eq!(arrow_decimal.value(2), 12);
}

#[rstest]
#[case(0i8)]
#[case(0i16)]
Expand Down
34 changes: 29 additions & 5 deletions vortex-array/src/arrow/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::Arc;

use arrow_array::cast::{AsArray, as_null_array};
use arrow_array::types::{
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal32Type, Decimal64Type,
Decimal128Type, Decimal256Type, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_array::{
Array as ArrowArray, ArrowPrimitiveType, BooleanArray as ArrowBooleanArray,
Expand Down Expand Up @@ -104,6 +104,24 @@ impl_from_arrow_primitive!(Float16Type);
impl_from_arrow_primitive!(Float32Type);
impl_from_arrow_primitive!(Float64Type);

impl FromArrowArray<&ArrowPrimitiveArray<Decimal32Type>> for ArrayRef {
fn from_arrow(array: &ArrowPrimitiveArray<Decimal32Type>, nullable: bool) -> Self {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
DecimalArray::new(buffer, decimal_type, validity).into_array()
}
}

impl FromArrowArray<&ArrowPrimitiveArray<Decimal64Type>> for ArrayRef {
fn from_arrow(array: &ArrowPrimitiveArray<Decimal64Type>, nullable: bool) -> Self {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
DecimalArray::new(buffer, decimal_type, validity).into_array()
}
}

impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, nullable: bool) -> Self {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
Expand Down Expand Up @@ -433,6 +451,12 @@ impl FromArrowArray<&dyn ArrowArray> for ArrayRef {
}
ArrowTimeUnit::Second | ArrowTimeUnit::Millisecond => unreachable!(),
},
DataType::Decimal32(..) => {
Self::from_arrow(array.as_primitive::<Decimal32Type>(), nullable)
}
DataType::Decimal64(..) => {
Self::from_arrow(array.as_primitive::<Decimal64Type>(), nullable)
}
DataType::Decimal128(..) => {
Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
}
Expand Down
2 changes: 2 additions & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ datafusion-datasource = { workspace = true, default-features = false }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use arrow_schema::{DataType, Schema};
use datafusion_expr::Operator as DFOperator;
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
use datafusion_physical_plan::expressions as df_expr;
use vortex::error::{VortexResult, vortex_bail, vortex_err};
use vortex::expr::{BinaryExpr, ExprRef, LikeExpr, Operator, and, get_item, lit, root};
Expand Down Expand Up @@ -122,6 +123,12 @@ impl TryFromDataFusion<DFOperator> for Operator {
}

pub(crate) fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
// We currently do not support pushdown of dynamic expressions in DF.
// See issue: https://github.com/vortex-data/vortex/issues/4034
if is_dynamic_physical_expr(expr) {
return false;
}

let expr = expr.as_any();
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
can_binary_be_pushed_down(binary, schema)
Expand Down
Loading
Loading