Skip to content

Commit ec3543b

Browse files
yjshenalamb
andauthored
Introduce RowLayout to represent rows for different purposes (#2261)
* Introduce RowLayout to represent rows for different purposes * revert default * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * more &schema * more tests and refactor * logs for flasky test * unwanted cargo change Co-authored-by: Andrew Lamb <[email protected]>
1 parent 7548e96 commit ec3543b

File tree

10 files changed

+579
-218
lines changed

10 files changed

+579
-218
lines changed

datafusion/core/benches/jit.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ extern crate datafusion;
2323
mod data_utils;
2424
use crate::criterion::Criterion;
2525
use crate::data_utils::{create_record_batches, create_schema};
26-
use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
26+
use datafusion::row::jit::writer::bench_write_batch_jit;
27+
use datafusion::row::writer::bench_write_batch;
28+
use datafusion::row::RowType;
2729
use std::sync::Arc;
2830

2931
fn criterion_benchmark(c: &mut Criterion) {
@@ -35,15 +37,38 @@ fn criterion_benchmark(c: &mut Criterion) {
3537
let batches =
3638
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);
3739

38-
c.bench_function("row serializer", |b| {
40+
c.bench_function("compact row serializer", |b| {
3941
b.iter(|| {
40-
criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
42+
criterion::black_box(
43+
bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
44+
)
4145
})
4246
});
4347

44-
c.bench_function("row serializer jit", |b| {
48+
c.bench_function("word aligned row serializer", |b| {
4549
b.iter(|| {
46-
criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
50+
criterion::black_box(
51+
bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
52+
.unwrap(),
53+
)
54+
})
55+
});
56+
57+
c.bench_function("compact row serializer jit", |b| {
58+
b.iter(|| {
59+
criterion::black_box(
60+
bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
61+
.unwrap(),
62+
)
63+
})
64+
});
65+
66+
c.bench_function("word aligned row serializer jit", |b| {
67+
b.iter(|| {
68+
criterion::black_box(
69+
bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
70+
.unwrap(),
71+
)
4772
})
4873
});
4974
}

datafusion/core/src/row/jit/mod.rs

Lines changed: 133 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! Just-In-Time(JIT) version for row reader and writers
1919
20-
mod reader;
21-
mod writer;
20+
pub mod reader;
21+
pub mod writer;
2222

2323
#[macro_export]
2424
/// register external functions to the assembler
@@ -46,42 +46,43 @@ mod tests {
4646
use crate::error::Result;
4747
use crate::row::jit::reader::read_as_batch_jit;
4848
use crate::row::jit::writer::write_batch_unchecked_jit;
49+
use crate::row::layout::RowType::{Compact, WordAligned};
4950
use arrow::record_batch::RecordBatch;
5051
use arrow::{array::*, datatypes::*};
5152
use datafusion_jit::api::Assembler;
5253
use std::sync::Arc;
5354
use DataType::*;
5455

5556
macro_rules! fn_test_single_type {
56-
($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
57+
($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
5758
paste::item! {
5859
#[test]
5960
#[allow(non_snake_case)]
60-
fn [<test_single_ $TYPE _jit>]() -> Result<()> {
61+
fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
6162
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
6263
let a = $ARRAY::from($VEC);
6364
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
6465
let mut vector = vec![0; 1024];
6566
let assembler = Assembler::default();
6667
let row_offsets =
67-
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
68-
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
68+
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
69+
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
6970
assert_eq!(batch, output_batch);
7071
Ok(())
7172
}
7273

7374
#[test]
7475
#[allow(non_snake_case)]
75-
fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
76+
fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> Result<()> {
7677
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
7778
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
7879
let a = $ARRAY::from(v);
7980
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
8081
let mut vector = vec![0; 1024];
8182
let assembler = Assembler::default();
8283
let row_offsets =
83-
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
84-
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
84+
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
85+
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
8586
assert_eq!(batch, output_batch);
8687
Ok(())
8788
}
@@ -92,85 +93,190 @@ mod tests {
9293
fn_test_single_type!(
9394
BooleanArray,
9495
Boolean,
95-
vec![Some(true), Some(false), None, Some(true), None]
96+
vec![Some(true), Some(false), None, Some(true), None],
97+
Compact
98+
);
99+
100+
fn_test_single_type!(
101+
BooleanArray,
102+
Boolean,
103+
vec![Some(true), Some(false), None, Some(true), None],
104+
WordAligned
96105
);
97106

98107
fn_test_single_type!(
99108
Int8Array,
100109
Int8,
101-
vec![Some(5), Some(7), None, Some(0), Some(111)]
110+
vec![Some(5), Some(7), None, Some(0), Some(111)],
111+
Compact
112+
);
113+
114+
fn_test_single_type!(
115+
Int8Array,
116+
Int8,
117+
vec![Some(5), Some(7), None, Some(0), Some(111)],
118+
WordAligned
119+
);
120+
121+
fn_test_single_type!(
122+
Int16Array,
123+
Int16,
124+
vec![Some(5), Some(7), None, Some(0), Some(111)],
125+
Compact
102126
);
103127

104128
fn_test_single_type!(
105129
Int16Array,
106130
Int16,
107-
vec![Some(5), Some(7), None, Some(0), Some(111)]
131+
vec![Some(5), Some(7), None, Some(0), Some(111)],
132+
WordAligned
108133
);
109134

110135
fn_test_single_type!(
111136
Int32Array,
112137
Int32,
113-
vec![Some(5), Some(7), None, Some(0), Some(111)]
138+
vec![Some(5), Some(7), None, Some(0), Some(111)],
139+
Compact
140+
);
141+
142+
fn_test_single_type!(
143+
Int32Array,
144+
Int32,
145+
vec![Some(5), Some(7), None, Some(0), Some(111)],
146+
WordAligned
147+
);
148+
149+
fn_test_single_type!(
150+
Int64Array,
151+
Int64,
152+
vec![Some(5), Some(7), None, Some(0), Some(111)],
153+
Compact
114154
);
115155

116156
fn_test_single_type!(
117157
Int64Array,
118158
Int64,
119-
vec![Some(5), Some(7), None, Some(0), Some(111)]
159+
vec![Some(5), Some(7), None, Some(0), Some(111)],
160+
WordAligned
120161
);
121162

122163
fn_test_single_type!(
123164
UInt8Array,
124165
UInt8,
125-
vec![Some(5), Some(7), None, Some(0), Some(111)]
166+
vec![Some(5), Some(7), None, Some(0), Some(111)],
167+
Compact
168+
);
169+
170+
fn_test_single_type!(
171+
UInt8Array,
172+
UInt8,
173+
vec![Some(5), Some(7), None, Some(0), Some(111)],
174+
WordAligned
126175
);
127176

128177
fn_test_single_type!(
129178
UInt16Array,
130179
UInt16,
131-
vec![Some(5), Some(7), None, Some(0), Some(111)]
180+
vec![Some(5), Some(7), None, Some(0), Some(111)],
181+
Compact
182+
);
183+
184+
fn_test_single_type!(
185+
UInt16Array,
186+
UInt16,
187+
vec![Some(5), Some(7), None, Some(0), Some(111)],
188+
WordAligned
189+
);
190+
191+
fn_test_single_type!(
192+
UInt32Array,
193+
UInt32,
194+
vec![Some(5), Some(7), None, Some(0), Some(111)],
195+
Compact
132196
);
133197

134198
fn_test_single_type!(
135199
UInt32Array,
136200
UInt32,
137-
vec![Some(5), Some(7), None, Some(0), Some(111)]
201+
vec![Some(5), Some(7), None, Some(0), Some(111)],
202+
WordAligned
203+
);
204+
205+
fn_test_single_type!(
206+
UInt64Array,
207+
UInt64,
208+
vec![Some(5), Some(7), None, Some(0), Some(111)],
209+
Compact
138210
);
139211

140212
fn_test_single_type!(
141213
UInt64Array,
142214
UInt64,
143-
vec![Some(5), Some(7), None, Some(0), Some(111)]
215+
vec![Some(5), Some(7), None, Some(0), Some(111)],
216+
WordAligned
217+
);
218+
219+
fn_test_single_type!(
220+
Float32Array,
221+
Float32,
222+
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
223+
Compact
144224
);
145225

146226
fn_test_single_type!(
147227
Float32Array,
148228
Float32,
149-
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
229+
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
230+
WordAligned
231+
);
232+
233+
fn_test_single_type!(
234+
Float64Array,
235+
Float64,
236+
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
237+
Compact
150238
);
151239

152240
fn_test_single_type!(
153241
Float64Array,
154242
Float64,
155-
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
243+
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
244+
WordAligned
245+
);
246+
247+
fn_test_single_type!(
248+
Date32Array,
249+
Date32,
250+
vec![Some(5), Some(7), None, Some(0), Some(111)],
251+
Compact
156252
);
157253

158254
fn_test_single_type!(
159255
Date32Array,
160256
Date32,
161-
vec![Some(5), Some(7), None, Some(0), Some(111)]
257+
vec![Some(5), Some(7), None, Some(0), Some(111)],
258+
WordAligned
259+
);
260+
261+
fn_test_single_type!(
262+
Date64Array,
263+
Date64,
264+
vec![Some(5), Some(7), None, Some(0), Some(111)],
265+
Compact
162266
);
163267

164268
fn_test_single_type!(
165269
Date64Array,
166270
Date64,
167-
vec![Some(5), Some(7), None, Some(0), Some(111)]
271+
vec![Some(5), Some(7), None, Some(0), Some(111)],
272+
WordAligned
168273
);
169274

170275
fn_test_single_type!(
171276
StringArray,
172277
Utf8,
173-
vec![Some("hello"), Some("world"), None, Some(""), Some("")]
278+
vec![Some("hello"), Some("world"), None, Some(""), Some("")],
279+
Compact
174280
);
175281

176282
#[test]
@@ -190,10 +296,11 @@ mod tests {
190296
0,
191297
schema.clone(),
192298
&assembler,
299+
Compact,
193300
)?
194301
};
195302
let output_batch =
196-
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
303+
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
197304
assert_eq!(batch, output_batch);
198305
Ok(())
199306
}
@@ -214,10 +321,11 @@ mod tests {
214321
0,
215322
schema.clone(),
216323
&assembler,
324+
Compact,
217325
)?
218326
};
219327
let output_batch =
220-
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
328+
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
221329
assert_eq!(batch, output_batch);
222330
Ok(())
223331
}

datafusion/core/src/row/jit/reader.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use crate::error::{DataFusionError, Result};
2121
use crate::reg_fn;
2222
use crate::row::jit::fn_name;
23+
use crate::row::layout::RowType;
2324
use crate::row::reader::RowReader;
2425
use crate::row::reader::*;
2526
use crate::row::MutableRecordBatch;
@@ -38,10 +39,11 @@ pub fn read_as_batch_jit(
3839
schema: Arc<Schema>,
3940
offsets: &[usize],
4041
assembler: &Assembler,
42+
row_type: RowType,
4143
) -> Result<RecordBatch> {
4244
let row_num = offsets.len();
4345
let mut output = MutableRecordBatch::new(row_num, schema.clone());
44-
let mut row = RowReader::new(&schema);
46+
let mut row = RowReader::new(&schema, row_type);
4547
register_read_functions(assembler)?;
4648
let gen_func = gen_read_row(&schema, assembler)?;
4749
let mut jit = assembler.create_jit();
@@ -102,10 +104,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
102104
Ok(())
103105
}
104106

105-
fn gen_read_row(
106-
schema: &Arc<Schema>,
107-
assembler: &Assembler,
108-
) -> Result<GeneratedFunction> {
107+
fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
109108
use DataType::*;
110109
let mut builder = assembler
111110
.new_func_builder("read_row")

0 commit comments

Comments
 (0)