diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 090b9514dc2d..7bcc5503823d 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -628,10 +628,14 @@ impl ChunkReader for ColumnChunkData { type T = bytes::buf::Reader; fn get_read(&self, start: u64, length: usize) -> Result { + Ok(self.get_bytes(start, length)?.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { match &self { ColumnChunkData::Sparse { data, .. } => data .binary_search_by_key(&start, |(offset, _)| *offset as u64) - .map(|idx| data[idx].1.slice(0..length).reader()) + .map(|idx| data[idx].1.slice(0..length)) .map_err(|_| { ParquetError::General(format!( "Invalid offset in sparse column chunk data: {}", @@ -641,7 +645,7 @@ impl ChunkReader for ColumnChunkData { ColumnChunkData::Dense { offset, data } => { let start = start as usize - *offset; let end = start + length; - Ok(data.slice(start..end).reader()) + Ok(data.slice(start..end)) } } } diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index dc1d66d0fa44..93c713417c87 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -62,19 +62,8 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result; + + /// Get a range as bytes + /// This should fail if the exact number of bytes cannot be read + fn get_bytes(&self, start: u64, length: usize) -> Result { + let mut buffer = Vec::with_capacity(length); + let read = self.get_read(start, length)?.read_to_end(&mut buffer)?; + + if read != length { + return Err(eof_err!( + "Expected to read {} bytes, read only {}", + length, + read + )); + } + Ok(buffer.into()) + } } // ---------------------------------------------------------------------- diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1a6a9026d5dd..f7a568e9258f 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -79,8 +79,12 @@ impl ChunkReader for Bytes { type T = bytes::buf::Reader; fn get_read(&self, start: u64, length: usize) -> Result { + Ok(self.get_bytes(start, length)?.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { let start = start as usize; - Ok(self.slice(start..start + length).reader()) + Ok(self.slice(start..start + length)) } } @@ -623,26 +627,13 @@ impl PageReader for SerializedPageReader { let page_len = front.compressed_page_size as usize; - // TODO: Add ChunkReader get_bytes to potentially avoid copy - let mut buffer = Vec::with_capacity(page_len); - let read = self - .reader - .get_read(front.offset as u64, page_len)? - .read_to_end(&mut buffer)?; - - if read != page_len { - return Err(eof_err!( - "Expected to read {} bytes of page, read only {}", - page_len, - read - )); - } + let buffer = self.reader.get_bytes(front.offset as u64, page_len)?; - let mut cursor = Cursor::new(buffer); + let mut cursor = Cursor::new(buffer.as_ref()); let header = read_page_header(&mut cursor)?; let offset = cursor.position(); - let bytes = Bytes::from(cursor.into_inner()).slice(offset as usize..); + let bytes = buffer.slice(offset as usize..); decode_page( header, bytes.into(),