Skip to content
73 changes: 50 additions & 23 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::mem;
use std::ops::Range;
use std::sync::Arc;

use crate::data::private::UnsafeFlag;
use crate::{equal, validate_binary_view, validate_string_view};

#[inline]
Expand Down Expand Up @@ -1781,33 +1780,61 @@ impl PartialEq for ArrayData {
}
}

mod private {
/// A boolean flag that cannot be mutated outside of unsafe code.
/// A boolean flag that cannot be mutated outside of unsafe code.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to make this UnsafeFlag public (and added examples and more docs) so I could use it across the two crates. However, I can also make a private copy of it in arrow-ipc if reviewers feel it would be better to avoid a new API

///
/// Defaults to a value of false.
///
/// This structure is used to enforce safety in the [`ArrayDataBuilder`]
///
/// [`ArrayDataBuilder`]: super::ArrayDataBuilder
///
/// # Example
/// ```rust
/// use arrow_data::UnsafeFlag;
/// assert!(!UnsafeFlag::default().get()); // default is false
/// let mut flag = UnsafeFlag::new();
/// assert!(!flag.get()); // defaults to false
/// // can only set it to true in unsafe code
/// unsafe { flag.set(true) };
/// assert!(flag.get()); // now true
/// ```
#[derive(Debug, Clone)]
pub struct UnsafeFlag(bool);

impl UnsafeFlag {
/// Creates a new `UnsafeFlag` with the value set to `false`.
///
/// See examples on [`Self::new`]
#[inline]
pub const fn new() -> Self {
Self(false)
}

/// Sets the value of the flag to the given value
///
/// Defaults to a value of false.
/// Note this can purposely only be done in `unsafe` code
///
/// This structure is used to enforce safety in the [`ArrayDataBuilder`]
/// # Safety
///
/// [`ArrayDataBuilder`]: super::ArrayDataBuilder
#[derive(Debug)]
pub struct UnsafeFlag(bool);

impl UnsafeFlag {
/// Creates a new `UnsafeFlag` with the value set to `false`
#[inline]
pub const fn new() -> Self {
Self(false)
}
/// If set, the flag will be set to the given value. There is nothing
/// immediately unsafe about doing so, however, the flag can be used to
/// subsequently bypass safety checks in the [`ArrayDataBuilder`].
#[inline]
pub unsafe fn set(&mut self, val: bool) {
self.0 = val;
}

#[inline]
pub unsafe fn set(&mut self, val: bool) {
self.0 = val;
}
/// Returns the value of the flag
#[inline]
pub fn get(&self) -> bool {
self.0
}
}

#[inline]
pub fn get(&self) -> bool {
self.0
}
// Manual impl to make it clear you can not construct unsafe with true
impl Default for UnsafeFlag {
fn default() -> Self {
Self::new()
}
}

Expand Down
161 changes: 123 additions & 38 deletions arrow-ipc/benches/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,34 @@ use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::{root_as_footer, Block, CompressionType};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use std::io::Cursor;
use std::io::{Cursor, Write};
use std::sync::Arc;
use tempfile::tempdir;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("arrow_ipc_reader");

group.bench_function("StreamReader/read_10", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_stream();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added new versions of each benchmark that work with disabled validation

b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("StreamReader/no_validation/read_10", |b| {
let buffer = ipc_stream();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
Expand All @@ -51,69 +60,100 @@ fn criterion_benchmark(c: &mut Criterion) {
});

group.bench_function("StreamReader/read_10/zstd", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options)
.unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_stream_zstd();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("StreamReader/no_validation/read_10/zstd", |b| {
let buffer = ipc_stream_zstd();
b.iter(move || {
let projection = None;
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

// --- Create IPC File ---
group.bench_function("FileReader/read_10", |b| {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let buffer = ipc_file();
b.iter(move || {
let projection = None;
let cursor = Cursor::new(buffer.as_slice());
let mut reader = FileReader::try_new(cursor, projection).unwrap();
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

group.bench_function("FileReader/no_validation/read_10", |b| {
let buffer = ipc_file();
b.iter(move || {
let projection = None;
let cursor = Cursor::new(buffer.as_slice());
let mut reader = FileReader::try_new(cursor, projection).unwrap();
unsafe {
// safety: we created a valid IPC file
reader = reader.with_skip_validation(true);
}
for _ in 0..10 {
reader.next().unwrap().unwrap();
}
assert!(reader.next().is_none());
})
});

// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let mut file = std::fs::File::create(&path).unwrap();
file.write_all(&ipc_file()).unwrap();
drop(file);

group.bench_function("FileReader/read_10/mmap", |b| {
let batch = create_batch(8192, true);
// write to an actual file
let dir = tempdir().unwrap();
let path = dir.path().join("test.arrow");
let file = std::fs::File::create(&path).unwrap();
let mut writer = FileWriter::try_new(file, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
let path = &path;
b.iter(move || {
let ipc_file = std::fs::File::open(path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
decoder.get_batch(i);
}
})
});

group.bench_function("FileReader/no_validation/read_10/mmap", |b| {
let path = &path;
b.iter(move || {
let ipc_file = std::fs::File::open(&path).expect("failed to open file");
let ipc_file = std::fs::File::open(path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
let decoder = unsafe { decoder.with_skip_validation(true) };
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
Expand All @@ -123,6 +163,46 @@ fn criterion_benchmark(c: &mut Criterion) {
});
}

/// Return an IPC stream with 10 record batches
fn ipc_stream() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

/// Return an IPC stream with ZSTD compression with 10 record batches
fn ipc_stream_zstd() -> Vec<u8> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but I wonder if we could have an ipc_stream_options that takes a IpcWriteOptions and then have ipc_stream just call through to this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a good suggestion -- I did it in 7d3f020 🧹

let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

/// Return an IPC file with 10 record batches
fn ipc_file() -> Vec<u8> {
let batch = create_batch(8192, true);
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
for _ in 0..10 {
writer.write(&batch).unwrap();
}
writer.finish().unwrap();
buffer
}

// copied from the zero_copy_ipc example.
// should we move this to an actual API?
/// Wrapper around the example in the `FileDecoder` which handles the
Expand Down Expand Up @@ -166,6 +246,11 @@ impl IPCBufferDecoder {
}
}

unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
self.decoder = self.decoder.with_skip_validation(skip_validation);
self
}

fn num_batches(&self) -> usize {
self.batches.len()
}
Expand Down
Loading
Loading