Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions datafusion/core/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
use datafusion::row::jit::writer::bench_write_compact_batch_jit;
use datafusion::row::writer::bench_write_compact_batch;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -37,13 +38,17 @@ fn criterion_benchmark(c: &mut Criterion) {

c.bench_function("row serializer", |b| {
b.iter(|| {
criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
criterion::black_box(
bench_write_compact_batch(&batches, schema.clone()).unwrap(),
)
})
});

c.bench_function("row serializer jit", |b| {
b.iter(|| {
criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
criterion::black_box(
bench_write_compact_batch_jit(&batches, schema.clone()).unwrap(),
)
})
});
}
Expand Down
30 changes: 16 additions & 14 deletions datafusion/core/src/row/jit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Just-In-Time(JIT) version for row reader and writers

mod reader;
mod writer;
pub mod reader;
pub mod writer;

#[macro_export]
/// register external functions to the assembler
Expand All @@ -44,8 +44,8 @@ fn fn_name<T>(f: T) -> &'static str {
#[cfg(test)]
mod tests {
use crate::error::Result;
use crate::row::jit::reader::read_as_batch_jit;
use crate::row::jit::writer::write_batch_unchecked_jit;
use crate::row::jit::reader::read_compact_rows_as_batch_jit;
use crate::row::jit::writer::write_compact_batch_unchecked_jit;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_jit::api::Assembler;
Expand All @@ -64,8 +64,8 @@ mod tests {
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_compact_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -80,8 +80,8 @@ mod tests {
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_compact_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand Down Expand Up @@ -183,7 +183,7 @@ mod tests {
let mut vector = vec![0; 8192];
let assembler = Assembler::default();
let row_offsets = {
write_batch_unchecked_jit(
write_compact_batch_unchecked_jit(
&mut vector,
0,
&batch,
Expand All @@ -192,8 +192,9 @@ mod tests {
&assembler,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
let output_batch = {
read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)?
};
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -207,7 +208,7 @@ mod tests {
let mut vector = vec![0; 8192];
let assembler = Assembler::default();
let row_offsets = {
write_batch_unchecked_jit(
write_compact_batch_unchecked_jit(
&mut vector,
0,
&batch,
Expand All @@ -216,8 +217,9 @@ mod tests {
&assembler,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
let output_batch = {
read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)?
};
assert_eq!(batch, output_batch);
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/row/jit/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::reg_fn;
use crate::row::jit::fn_name;
use crate::row::layout::RowType;
use crate::row::reader::RowReader;
use crate::row::reader::*;
use crate::row::MutableRecordBatch;
Expand All @@ -33,15 +34,15 @@ use std::sync::Arc;

/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch

pub fn read_as_batch_jit(
pub fn read_compact_rows_as_batch_jit(
data: &[u8],
schema: Arc<Schema>,
offsets: &[usize],
assembler: &Assembler,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
let mut row = RowReader::new(&schema);
let mut row = RowReader::new(&schema, RowType::Compact);
register_read_functions(assembler)?;
let gen_func = gen_read_row(&schema, assembler)?;
let mut jit = assembler.create_jit();
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/row/jit/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::Result;
use crate::reg_fn;
use crate::row::jit::fn_name;
use crate::row::layout::RowType;
use crate::row::schema_null_free;
use crate::row::writer::RowWriter;
use crate::row::writer::*;
Expand All @@ -36,15 +37,15 @@ use std::sync::Arc;
/// # Panics
///
/// This function will panic if the output buffer doesn't have enough space to hold all the rows
pub fn write_batch_unchecked_jit(
pub fn write_compact_batch_unchecked_jit(
output: &mut [u8],
offset: usize,
batch: &RecordBatch,
row_idx: usize,
schema: Arc<Schema>,
assembler: &Assembler,
) -> Result<Vec<usize>> {
let mut writer = RowWriter::new(&schema);
let mut writer = RowWriter::new(&schema, RowType::Compact);
let mut current_offset = offset;
let mut offsets = vec![];
register_write_functions(assembler)?;
Expand All @@ -71,12 +72,12 @@ pub fn write_batch_unchecked_jit(

/// bench jit version write
#[inline(never)]
pub fn bench_write_batch_jit(
pub fn bench_write_compact_batch_jit(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
) -> Result<Vec<usize>> {
let assembler = Assembler::default();
let mut writer = RowWriter::new(&schema);
let mut writer = RowWriter::new(&schema, RowType::Compact);
let mut lengths = vec![];
register_write_functions(&assembler)?;
let gen_func = gen_write_row(&schema, &assembler)?;
Expand Down
88 changes: 83 additions & 5 deletions datafusion/core/src/row/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,94 @@

//! Various row layout for different use case

use crate::row::{schema_null_free, var_length};
use crate::row::{row_supported, schema_null_free, var_length};
use arrow::datatypes::{DataType, Schema};
use arrow::util::bit_util::{ceil, round_upto_power_of_2};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

const UTF8_DEFAULT_SIZE: usize = 20;
const BINARY_DEFAULT_SIZE: usize = 100;

#[derive(Copy, Clone, Debug)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main changes are below. Other files changes are almost mechanical.

/// Type of a RowLayout
pub enum RowType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// This type of layout will store each field with minimum bytes for space efficiency.
/// Its typical use case represents a sorting payload that accesses all row fields as a unit.
Compact,
/// This type of layout will store one 8-byte word per field for CPU-friendly,
/// It is mainly used to represent the rows with frequently updated content,
/// for example, grouping state for hash aggregation.
WordAligned,
// RawComparable,
}

/// Reveals how the fields of a record are stored in the raw-bytes format
pub(crate) struct RowLayout {
/// Type of the layout
type_: RowType,
/// If a row is null free according to its schema
pub(crate) null_free: bool,
/// The number of bytes used to store null bits for each field.
pub(crate) null_width: usize,
/// Length in bytes for `values` part of the current tuple.
pub(crate) values_width: usize,
/// Total number of fields for each tuple.
pub(crate) field_count: usize,
/// Starting offset for each fields in the raw bytes.
pub(crate) field_offsets: Vec<usize>,
}

impl RowLayout {
pub(crate) fn new(schema: &Arc<Schema>, type_: RowType) -> Self {
assert!(row_supported(schema));
let null_free = schema_null_free(schema);
let field_count = schema.fields().len();
let null_width = if null_free { 0 } else { ceil(field_count, 8) };
let (field_offsets, values_width) = match type_ {
RowType::Compact => compact_offsets(null_width, schema),
RowType::WordAligned => word_aligned_offsets(null_width, schema),
};
Self {
type_,
null_free,
null_width,
values_width,
field_count,
field_offsets,
}
}

#[inline(always)]
pub(crate) fn init_varlena_offset(&self) -> usize {
self.null_width + self.values_width
}
}

impl Debug for RowLayout {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RowLayout")
.field("type", &self.type_)
.field("null_width", &self.null_width)
.field("values_width", &self.values_width)
.field("field_count", &self.field_count)
.field("offsets", &self.field_offsets)
.finish()
}
}

/// Get relative offsets for each field and total width for values
pub fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
fn compact_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
let mut offsets = vec![];
let mut offset = null_width;
for f in schema.fields() {
offsets.push(offset);
offset += type_width(f.data_type());
offset += compact_type_width(f.data_type());
}
(offsets, offset - null_width)
}

fn type_width(dt: &DataType) -> usize {
fn compact_type_width(dt: &DataType) -> usize {
use DataType::*;
if var_length(dt) {
return std::mem::size_of::<u64>();
Expand All @@ -50,13 +118,23 @@ fn type_width(dt: &DataType) -> usize {
}
}

fn word_aligned_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
let mut offsets = vec![];
let mut offset = null_width;
for _ in schema.fields() {
offsets.push(offset);
offset += 8; // a 8-bytes word for each field
}
(offsets, offset - null_width)
}

/// Estimate row width based on schema
pub fn estimate_row_width(schema: &Arc<Schema>) -> usize {
let null_free = schema_null_free(schema);
let field_count = schema.fields().len();
let mut width = if null_free { 0 } else { ceil(field_count, 8) };
for f in schema.fields() {
width += type_width(f.data_type());
width += compact_type_width(f.data_type());
match f.data_type() {
DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
DataType::Binary => width += BINARY_DEFAULT_SIZE,
Expand Down
30 changes: 15 additions & 15 deletions datafusion/core/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use arrow::record_batch::RecordBatch;
use std::sync::Arc;

#[cfg(feature = "jit")]
mod jit;
pub mod jit;
mod layout;
pub mod reader;
mod validity;
Expand Down Expand Up @@ -155,8 +155,8 @@ mod tests {
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{collect, ExecutionPlan};
use crate::prelude::SessionContext;
use crate::row::reader::read_as_batch;
use crate::row::writer::write_batch_unchecked;
use crate::row::reader::read_compact_rows_as_batch;
use crate::row::writer::write_compact_batch_unchecked;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_data_access::object_store::local::LocalFileSystem;
Expand All @@ -176,8 +176,8 @@ mod tests {
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
{ write_compact_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_compact_rows_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -191,8 +191,8 @@ mod tests {
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
{ write_compact_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_compact_rows_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand Down Expand Up @@ -293,8 +293,8 @@ mod tests {
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 8192];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
{ write_compact_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_compact_rows_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -307,8 +307,8 @@ mod tests {
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 8192];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
{ write_compact_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
let output_batch = { read_compact_rows_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -327,8 +327,8 @@ mod tests {

let mut vector = vec![0; 20480];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
{ write_compact_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
let output_batch = { read_compact_rows_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(*batch, output_batch);

Ok(())
Expand All @@ -341,7 +341,7 @@ mod tests {
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let schema = batch.schema();
let mut vector = vec![0; 1024];
write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
write_compact_batch_unchecked(&mut vector, 0, &batch, 0, schema);
}

#[test]
Expand All @@ -354,7 +354,7 @@ mod tests {
)]));
let vector = vec![0; 1024];
let row_offsets = vec![0];
read_as_batch(&vector, schema, &row_offsets).unwrap();
read_compact_rows_as_batch(&vector, schema, &row_offsets).unwrap();
}

async fn get_exec(
Expand Down
Loading