@@ -530,8 +530,9 @@ impl ArrowColumnChunk {
530530
531531/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
532532///
533- /// Note: This is a low-level interface for applications that require fine-grained control
534- /// of encoding, see [`ArrowWriter`] for a higher-level interface
533+ /// Note: This is a low-level interface for applications that require
534+ /// fine-grained control of encoding (e.g. encoding using multiple threads),
535+ /// see [`ArrowWriter`] for a higher-level interface
535536///
536537/// # Example: Encoding two Arrow Array's in Parallel
537538/// ```
@@ -540,9 +541,9 @@ impl ArrowColumnChunk {
540541/// # use arrow_array::*;
541542/// # use arrow_schema::*;
542543/// # use parquet::arrow::ArrowSchemaConverter;
543- /// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
544+ /// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk };
544545/// # use parquet::file::properties::WriterProperties;
545- /// # use parquet::file::writer::SerializedFileWriter;
546+ /// # use parquet::file::writer::{ SerializedFileWriter, SerializedRowGroupWriter} ;
546547/// #
547548/// let schema = Arc::new(Schema::new(vec![
548549/// Field::new("i32", DataType::Int32, false),
@@ -560,15 +561,20 @@ impl ArrowColumnChunk {
560561/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
561562///
562563/// // Spawn a worker thread for each column
563- /// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
564+ /// //
565+ /// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
566+ /// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
564567/// let mut workers: Vec<_> = col_writers
565568/// .into_iter()
566569/// .map(|mut col_writer| {
567570/// let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
568571/// let handle = std::thread::spawn(move || {
572+ /// // receive Arrays to encode via the channel
569573/// for col in recv {
570574/// col_writer.write(&col)?;
571575/// }
576+ /// // once the input is complete, close the writer
577+ /// // to return the newly created ArrowColumnChunk
572578/// col_writer.close()
573579/// });
574580/// (handle, send)
@@ -577,33 +583,40 @@ impl ArrowColumnChunk {
577583///
578584/// // Create parquet writer
579585/// let root_schema = parquet_schema.root_schema_ptr();
580- /// let mut out = Vec::with_capacity(1024); // This could be a File
581- /// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
586+ /// // write to memory in the example, but this could be a File
587+ /// let mut out = Vec::with_capacity(1024);
588+ /// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
589+ /// .unwrap();
582590///
583591/// // Start row group
584- /// let mut row_group = writer.next_row_group().unwrap();
592+ /// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
593+ /// .next_row_group()
594+ /// .unwrap();
585595///
586- /// // Columns to encode
596+ /// // Create some example input columns to encode
587597/// let to_write = vec![
588598/// Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
589599/// Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
590600/// ];
591601///
592- /// // Spawn work to encode columns
602+ /// // Send the input columns to the workers
593603/// let mut worker_iter = workers.iter_mut();
594604/// for (arr, field) in to_write.iter().zip(&schema.fields) {
595605/// for leaves in compute_leaves(field, arr).unwrap() {
596606/// worker_iter.next().unwrap().1.send(leaves).unwrap();
597607/// }
598608/// }
599609///
600- /// // Finish up parallel column encoding
610+ /// // Wait for the workers to complete encoding, and append
611+ /// // the resulting column chunks to the row group (and the file)
601612/// for (handle, send) in workers {
602613/// drop(send); // Drop send side to signal termination
603- /// let chunk = handle.join().unwrap().unwrap();
604- /// chunk.append_to_row_group(&mut row_group).unwrap();
614+ /// // wait for the worker to send the completed chunk
615+ /// let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
616+ /// chunk.append_to_row_group(&mut row_group_writer).unwrap();
605617/// }
606- /// row_group.close().unwrap();
618+ /// // Close the row group which writes to the underlying file
619+ /// row_group_writer.close().unwrap();
607620///
608621/// let metadata = writer.close().unwrap();
609622/// assert_eq!(metadata.num_rows, 3);
0 commit comments