Skip to content

Commit a4c17a9

Browse files
committed
Review feedback
1 parent 9c07441 commit a4c17a9

File tree

1 file changed

+51
-20
lines changed
  • parquet/src/arrow/arrow_writer

1 file changed

+51
-20
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Contains writer which writes arrow data into parquet data.
1919
2020
use bytes::Bytes;
21-
use std::fmt::Debug;
2221
use std::io::{Read, Write};
2322
use std::iter::Peekable;
2423
use std::slice::Iter;
@@ -49,7 +48,7 @@ use crate::errors::{ParquetError, Result};
4948
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
5049
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
5150
use crate::file::reader::{ChunkReader, Length};
52-
use crate::file::writer::SerializedFileWriter;
51+
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
5352
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
5453
use levels::{calculate_array_levels, ArrayLevels};
5554

@@ -99,7 +98,7 @@ pub struct ArrowWriter<W: Write> {
9998
max_row_group_size: usize,
10099
}
101100

102-
impl<W: Write + Send> Debug for ArrowWriter<W> {
101+
impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
103102
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104103
let buffered_memory = self.in_progress_size();
105104
f.debug_struct("ArrowWriter")
@@ -210,8 +209,8 @@ impl<W: Write + Send> ArrowWriter<W> {
210209
};
211210

212211
let mut row_group_writer = self.writer.next_row_group()?;
213-
for (chunk, close) in in_progress.close()? {
214-
row_group_writer.append_column(&chunk, close)?;
212+
for chunk in in_progress.close()? {
213+
chunk.append_to_row_group(&mut row_group_writer)?;
215214
}
216215
row_group_writer.close()?;
217216
Ok(())
@@ -250,18 +249,18 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
250249

251250
/// A single column chunk produced by [`ArrowColumnWriter`]
252251
#[derive(Default)]
253-
pub struct ArrowColumnChunk {
252+
struct ArrowColumnChunkData {
254253
length: usize,
255254
data: Vec<Bytes>,
256255
}
257256

258-
impl Length for ArrowColumnChunk {
257+
impl Length for ArrowColumnChunkData {
259258
fn len(&self) -> u64 {
260259
self.length as _
261260
}
262261
}
263262

264-
impl ChunkReader for ArrowColumnChunk {
263+
impl ChunkReader for ArrowColumnChunkData {
265264
type T = ArrowColumnChunkReader;
266265

267266
fn get_read(&self, start: u64) -> Result<Self::T> {
@@ -276,8 +275,8 @@ impl ChunkReader for ArrowColumnChunk {
276275
}
277276
}
278277

279-
/// A [`Read`] for [`ArrowColumnChunk`]
280-
pub struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
278+
/// A [`Read`] for [`ArrowColumnChunkData`]
279+
struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
281280

282281
impl Read for ArrowColumnChunkReader {
283282
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
@@ -299,11 +298,11 @@ impl Read for ArrowColumnChunkReader {
299298
}
300299
}
301300

302-
/// A shared [`ArrowColumnChunk`]
301+
/// A shared [`ArrowColumnChunkData`]
303302
///
304303
/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
305304
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
306-
type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
305+
type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
307306

308307
#[derive(Default)]
309308
struct ArrowPageWriter {
@@ -350,6 +349,7 @@ impl PageWriter for ArrowPageWriter {
350349
}
351350

352351
/// A leaf column that can be encoded by [`ArrowColumnWriter`]
352+
#[derive(Debug)]
353353
pub struct ArrowLeafColumn(ArrayLevels);
354354

355355
/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
@@ -358,7 +358,31 @@ pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafCo
358358
Ok(levels.into_iter().map(ArrowLeafColumn).collect())
359359
}
360360

361-
/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
361+
/// The data for a single column chunk, see [`ArrowColumnWriter`]
362+
pub struct ArrowColumnChunk {
363+
data: ArrowColumnChunkData,
364+
close: ColumnCloseResult,
365+
}
366+
367+
impl std::fmt::Debug for ArrowColumnChunk {
368+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369+
f.debug_struct("ArrowColumnChunk")
370+
.field("length", &self.data.length)
371+
.finish_non_exhaustive()
372+
}
373+
}
374+
375+
impl ArrowColumnChunk {
376+
/// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
377+
pub fn append_to_row_group<W: Write + Send>(
378+
self,
379+
writer: &mut SerializedRowGroupWriter<'_, W>,
380+
) -> Result<()> {
381+
writer.append_column(&self.data, self.close)
382+
}
383+
}
384+
385+
/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunkData`]
362386
///
363387
/// Note: This is a low-level interface for applications that require fine-grained control
364388
/// of encoding, see [`ArrowWriter`] for a higher-level interface
@@ -426,8 +450,8 @@ pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafCo
426450
/// // Finish up parallel column encoding
427451
/// for (handle, send) in workers {
428452
/// drop(send); // Drop send side to signal termination
429-
/// let (chunk, result) = handle.join().unwrap().unwrap();
430-
/// row_group.append_column(&chunk, result).unwrap();
453+
/// let chunk = handle.join().unwrap().unwrap();
454+
/// chunk.append_to_row_group(&mut row_group).unwrap();
431455
/// }
432456
/// row_group.close().unwrap();
433457
///
@@ -439,6 +463,12 @@ pub struct ArrowColumnWriter {
439463
chunk: SharedColumnChunk,
440464
}
441465

466+
impl std::fmt::Debug for ArrowColumnWriter {
467+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468+
f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
469+
}
470+
}
471+
442472
enum ArrowColumnWriterImpl {
443473
ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
444474
Column(ColumnWriter<'static>),
@@ -458,14 +488,15 @@ impl ArrowColumnWriter {
458488
Ok(())
459489
}
460490

461-
/// Close this column returning the [`ArrowColumnChunk`] and [`ColumnCloseResult`]
462-
pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> {
463-
let result = match self.writer {
491+
/// Close this column returning the written [`ArrowColumnChunk`]
492+
pub fn close(self) -> Result<ArrowColumnChunk> {
493+
let close = match self.writer {
464494
ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
465495
ArrowColumnWriterImpl::Column(c) => c.close()?,
466496
};
467497
let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
468-
Ok((chunk.into_inner().unwrap(), result))
498+
let data = chunk.into_inner().unwrap();
499+
Ok(ArrowColumnChunk { data, close })
469500
}
470501

471502
/// Returns the estimated total bytes for this column writer
@@ -509,7 +540,7 @@ impl ArrowRowGroupWriter {
509540
Ok(())
510541
}
511542

512-
fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
543+
fn close(self) -> Result<Vec<ArrowColumnChunk>> {
513544
self.writers
514545
.into_iter()
515546
.map(|writer| writer.close())

0 commit comments

Comments
 (0)