Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/query/catalog/src/plan/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use std::fmt::Debug;

use common_expression::types::DataType;
use common_expression::RemoteExpr;
use common_expression::TableField;
use common_expression::TableSchema;

use crate::plan::Projection;
Expand Down Expand Up @@ -48,7 +50,52 @@ pub struct PushDownInfo {
pub order_by: Vec<(RemoteExpr<String>, bool, bool)>,
}

/// TopK is a wrapper for topk push down items.
/// We only take the first column in order_by as the topk column.
#[derive(Debug, Clone)]
pub struct TopK {
pub limit: usize,
pub order_by: TableField,
pub asc: bool,
pub column_id: u32,
}

impl PushDownInfo {
pub fn top_k(&self, schema: &TableSchema, support: fn(&DataType) -> bool) -> Option<TopK> {
if !self.order_by.is_empty() && self.limit.is_some() {
let order = &self.order_by[0];
let limit = self.limit.unwrap();

const MAX_TOPK_LIMIT: usize = 1000;
if limit > MAX_TOPK_LIMIT {
return None;
}

if let RemoteExpr::<String>::ColumnRef { id, .. } = &order.0 {
let field = schema.field_with_name(id).unwrap();
let data_type: DataType = field.data_type().into();
if !support(&data_type) {
return None;
}

let leaf_fields = schema.leaf_fields();
let column_id = leaf_fields.iter().position(|p| p == field).unwrap();

let top_k = TopK {
limit: self.limit.unwrap(),
order_by: field.clone(),
asc: order.1,
column_id: column_id as u32,
};
Some(top_k)
} else {
None
}
} else {
None
}
}

pub fn prewhere_of_push_downs(push_downs: &Option<PushDownInfo>) -> Option<PrewhereInfo> {
if let Some(PushDownInfo { prewhere, .. }) = push_downs {
prewhere.clone()
Expand Down
104 changes: 16 additions & 88 deletions src/query/expression/src/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,124 +20,39 @@ use common_arrow::arrow::buffer::Buffer;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::filter_helper::FilterHelpers;
use crate::types::array::ArrayColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::number::NumberScalar;
use crate::types::string::StringColumnBuilder;
use crate::types::AnyType;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BooleanType;
use crate::types::StringType;
use crate::types::ValueType;
use crate::types::VariantType;
use crate::with_number_mapped_type;
use crate::with_number_type;
use crate::BlockEntry;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::Scalar;
use crate::TypeDeserializer;
use crate::Value;

impl DataBlock {
// check if the predicate has any valid row
pub fn filter_exists(predicate: &Value<AnyType>) -> Result<bool> {
let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Filter predict column does not support type '{:?}'",
predicate
))
})?;
match predicate {
Value::Scalar(s) => Ok(s),
Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()),
}
}

pub fn filter(self, predicate: &Value<AnyType>) -> Result<DataBlock> {
if self.num_rows() == 0 {
return Ok(self);
}

let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| {
let predicate = FilterHelpers::cast_to_nonull_boolean(predicate).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Filter predict column does not support type '{:?}'",
predicate
))
})?;

match predicate {
Value::Scalar(s) => {
if s {
Ok(self)
} else {
Ok(self.slice(0..0))
}
}
Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap),
}
}

// Must be numeric, boolean, or string value type
pub fn cast_to_nonull_boolean(predicate: &Value<AnyType>) -> Option<Value<BooleanType>> {
match predicate {
Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar),
Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column),
}
}

fn cast_scalar_to_boolean(s: &Scalar) -> Option<bool> {
match s {
Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num {
NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()),
}),
Scalar::Boolean(value) => Some(*value),
Scalar::String(value) => Some(!value.is_empty()),
Scalar::Timestamp(value) => Some(*value != 0),
Scalar::Date(value) => Some(*value != 0),
Scalar::Null => Some(false),
_ => None,
}
}

fn cast_column_to_boolean(c: &Column) -> Option<Bitmap> {
match c {
Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num {
NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| v != &SRC_TYPE::default()),
&[],
)),
}),
Column::Boolean(value) => Some(value.clone()),
Column::String(value) => Some(BooleanType::column_from_iter(
value.iter().map(|s| !s.is_empty()),
&[],
)),
Column::Timestamp(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| *v != 0),
&[],
)),
Column::Date(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| *v != 0),
&[],
)),
Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()),
Column::Nullable(c) => {
let inner = Self::cast_column_to_boolean(&c.column)?;
Some((&inner) & (&c.validity))
}
_ => None,
}
}

pub fn try_as_const_bool(value: &Value<BooleanType>) -> Result<Option<bool>> {
match value {
Value::Scalar(v) => Ok(Some(*v)),
_ => Ok(None),
}
self.filter_boolean_value(predicate)
}

pub fn filter_with_bitmap(block: DataBlock, bitmap: &Bitmap) -> Result<DataBlock> {
Expand Down Expand Up @@ -169,6 +84,19 @@ impl DataBlock {
}
}
}

pub fn filter_boolean_value(self, filter: Value<BooleanType>) -> Result<DataBlock> {
match filter {
Value::Scalar(s) => {
if s {
Ok(self)
} else {
Ok(self.slice(0..0))
}
}
Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap),
}
}
}

impl Column {
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ mod scatter;
mod sort;
mod take;
mod take_chunks;
mod topk;

pub use group_by::*;
pub use group_by_hash::*;
pub use sort::*;
pub use take_chunks::*;
pub use topk::*;
Loading