diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 1e1054c9a063..b0e81c499099 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -431,7 +431,7 @@ impl ChunkReader for ArrowColumnChunkData { )) } - fn get_bytes(&self, _start: u64, _length: usize) -> Result { + fn get_bytes(&self, _start: u64, _length: u64) -> Result { unimplemented!() } } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 705baa65824d..37a02d698cee 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -83,7 +83,7 @@ pub trait MetadataSuffixFetch: MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [FutureExt::boxed]. See the trait documentation for an example - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result>; + fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result>; } /// An asynchronous interface to load [`ParquetMetaData`] from an async source @@ -93,7 +93,7 @@ pub struct MetadataLoader { /// The in-progress metadata metadata: ParquetMetaData, /// The offset and bytes of remaining unparsed data - remainder: Option<(usize, Bytes)>, + remainder: Option<(u64, Bytes)>, } impl MetadataLoader { @@ -125,7 +125,7 @@ impl MetadataLoader { footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?; - let length = footer.metadata_length(); + let length = footer.metadata_length() as usize; if file_size < length + FOOTER_SIZE { return Err(ParquetError::EOF(format!( @@ -146,6 +146,7 @@ impl MetadataLoader { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; + let footer_start: u64 = footer_start.try_into()?; ( ParquetMetaDataReader::decode_metadata(slice)?, Some((footer_start, suffix.slice(..metadata_start))), @@ -294,6 +295,8 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { + let prefetch = prefetch.map(|v| v as u64); + let file_size = file_size as u64; let fetch = MetadataFetchFn(fetch); ParquetMetaDataReader::new() .with_prefetch_hint(prefetch) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 8879619e803a..444a59d849b6 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -138,7 +138,7 @@ impl AsyncFileReader for Box { } impl MetadataSuffixFetch for T { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result> { async move { self.seek(SeekFrom::End(-(suffix as i64))).await?; let mut buf = Vec::with_capacity(suffix); @@ -1057,7 +1057,8 @@ impl ChunkReader for ColumnChunkData { Ok(self.get(start)?.reader()) } - fn get_bytes(&self, start: u64, length: usize) -> Result { + fn get_bytes(&self, start: u64, length: u64) -> Result { + let length: usize = length.try_into()?; Ok(self.get(start)?.slice(..length)) } } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 0b79a3d6a947..a7cc32d8372b 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -161,9 +161,9 @@ impl ParquetObjectReader { } impl MetadataSuffixFetch for &mut ParquetObjectReader { - fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result> { let options = GetOptions { - range: Some(GetRange::Suffix(suffix as u64)), + range: Some(GetRange::Suffix(suffix)), ..Default::default() }; self.spawn(|store, path| { diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 69ef4538baa1..6eae63e9202a 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -186,7 +186,7 @@ impl std::ops::IndexMut for Block { #[derive(Debug, Clone)] pub struct Sbbf(Vec); -pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; +pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: u64 = 20; /// given an initial offset, and a byte buffer, try to read out a bloom filter header and return /// both the header and the offset after it (for bitset). @@ -317,7 +317,10 @@ impl Sbbf { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => reader.get_bytes(offset, length as usize), + Some(length) => { + let length: u64 = length.try_into()?; + reader.get_bytes(offset, length) + } None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE), }?; @@ -343,7 +346,7 @@ impl Sbbf { let bitset = match column_metadata.bloom_filter_length() { Some(_) => buffer.slice((bitset_offset - offset) as usize..), None => { - let bitset_length: usize = header.num_bytes.try_into().map_err(|_| { + let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; reader.get_bytes(bitset_offset, bitset_length)? diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index 4cb1f99c3cf6..384d52ad4106 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -46,12 +46,12 @@ pub enum ParquetError { ArrowError(String), /// Error when the requested column index is more than the /// number of columns in the row group - IndexOutOfBound(usize, usize), + IndexOutOfBound(u64, u64), /// An external error variant External(Box), - /// Returned when a function needs more data to complete properly. The `usize` field indicates + /// Returned when a function needs more data to complete properly. The `u64` field indicates /// the total number of bytes required, not the number of additional bytes. - NeedMoreData(usize), + NeedMoreData(u64), } impl std::fmt::Display for ParquetError { diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 85ef30cd0ecc..a9c642cbaa1a 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -76,6 +76,6 @@ pub fn decode_metadata(buf: &[u8]) -> Result { since = "53.1.0", note = "Use ParquetMetaDataReader::decode_footer_tail" )] -pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { +pub fn decode_footer(slice: &[u8; FOOTER_SIZE as usize]) -> Result { ParquetMetaDataReader::decode_footer_tail(slice).map(|f| f.metadata_length()) } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 233a55778721..ea9976f3a746 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1103,9 +1103,9 @@ impl ColumnChunkMetaData { } /// Returns the range for the offset index if any - pub(crate) fn column_index_range(&self) -> Option> { - let offset = usize::try_from(self.column_index_offset?).ok()?; - let length = usize::try_from(self.column_index_length?).ok()?; + pub(crate) fn column_index_range(&self) -> Option> { + let offset = u64::try_from(self.column_index_offset?).ok()?; + let length = u64::try_from(self.column_index_length?).ok()?; Some(offset..(offset + length)) } @@ -1120,9 +1120,9 @@ impl ColumnChunkMetaData { } /// Returns the range for the offset index if any - pub(crate) fn offset_index_range(&self) -> Option> { - let offset = usize::try_from(self.offset_index_offset?).ok()?; - let length = usize::try_from(self.offset_index_length?).ok()?; + pub(crate) fn offset_index_range(&self) -> Option> { + let offset = u64::try_from(self.offset_index_offset?).ok()?; + let length = u64::try_from(self.offset_index_length?).ok()?; Some(offset..(offset + length)) } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 590f7809a492..99c3300dfd90 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -72,10 +72,10 @@ pub struct ParquetMetaDataReader { metadata: Option, column_index: bool, offset_index: bool, - prefetch_hint: Option, - // Size of the serialized thrift metadata plus the 8 byte footer. Only set if - // `self.parse_metadata` is called. - metadata_size: Option, + prefetch_hint: Option, + /// Size of the serialized thrift metadata plus the 8 byte footer. Only set if + /// `self.parse_metadata` is called. + metadata_size: Option, #[cfg(feature = "encryption")] file_decryption_properties: Option, } @@ -84,13 +84,13 @@ pub struct ParquetMetaDataReader { /// /// This is parsed from the last 8 bytes of the Parquet file pub struct FooterTail { - metadata_length: usize, + metadata_length: u64, encrypted_footer: bool, } impl FooterTail { /// The length of the footer metadata in bytes - pub fn metadata_length(&self) -> usize { + pub fn metadata_length(&self) -> u64 { self.metadata_length } @@ -151,7 +151,7 @@ impl ParquetMetaDataReader { /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result /// in extra fetches being performed. - pub fn with_prefetch_hint(mut self, prefetch: Option) -> Self { + pub fn with_prefetch_hint(mut self, prefetch: Option) -> Self { self.prefetch_hint = prefetch; self } @@ -209,7 +209,7 @@ impl ParquetMetaDataReader { /// the request, and must include the Parquet footer. If page indexes are desired, the buffer /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. pub fn try_parse(&mut self, reader: &R) -> Result<()> { - self.try_parse_sized(reader, reader.len() as usize) + self.try_parse_sized(reader, reader.len()) } /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader` @@ -284,13 +284,13 @@ impl ParquetMetaDataReader { /// } /// let metadata = reader.finish().unwrap(); /// ``` - pub fn try_parse_sized(&mut self, reader: &R, file_size: usize) -> Result<()> { + pub fn try_parse_sized(&mut self, reader: &R, file_size: u64) -> Result<()> { self.metadata = match self.parse_metadata(reader) { Ok(metadata) => Some(metadata), Err(ParquetError::NeedMoreData(needed)) => { // If reader is the same length as `file_size` then presumably there is no more to // read, so return an EOF error. - if file_size == reader.len() as usize || needed > file_size { + if file_size == reader.len() || needed > file_size { return Err(eof_err!( "Parquet file too small. Size is {} but need {}", file_size, @@ -315,7 +315,7 @@ impl ParquetMetaDataReader { /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. pub fn read_page_indexes(&mut self, reader: &R) -> Result<()> { - self.read_page_indexes_sized(reader, reader.len() as usize) + self.read_page_indexes_sized(reader, reader.len()) } /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. @@ -326,7 +326,7 @@ impl ParquetMetaDataReader { pub fn read_page_indexes_sized( &mut self, reader: &R, - file_size: usize, + file_size: u64, ) -> Result<()> { if self.metadata.is_none() { return Err(general_err!( @@ -350,7 +350,7 @@ impl ParquetMetaDataReader { // Check to see if needed range is within `file_range`. Checking `range.end` seems // redundant, but it guards against `range_for_page_index()` returning garbage. - let file_range = file_size.saturating_sub(reader.len() as usize)..file_size; + let file_range = file_size.saturating_sub(reader.len())..file_size; if !(file_range.contains(&range.start) && file_range.contains(&range.end)) { // Requested range starts beyond EOF if range.end > file_size { @@ -378,7 +378,7 @@ impl ParquetMetaDataReader { } let bytes_needed = range.end - range.start; - let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?; + let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?; let offset = range.start; self.parse_column_index(&bytes, offset)?; @@ -397,7 +397,7 @@ impl ParquetMetaDataReader { pub async fn load_and_finish( mut self, fetch: F, - file_size: usize, + file_size: u64, ) -> Result { self.try_load(fetch, file_size).await?; self.finish() @@ -423,11 +423,7 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(all(feature = "async", feature = "arrow"))] - pub async fn try_load( - &mut self, - mut fetch: F, - file_size: usize, - ) -> Result<()> { + pub async fn try_load(&mut self, mut fetch: F, file_size: u64) -> Result<()> { let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?; self.metadata = Some(metadata); @@ -473,7 +469,7 @@ impl ParquetMetaDataReader { async fn load_page_index_with_remainder( &mut self, mut fetch: F, - remainder: Option<(usize, Bytes)>, + remainder: Option<(u64, Bytes)>, ) -> Result<()> { if self.metadata.is_none() { return Err(general_err!("Footer metadata is not present")); @@ -507,7 +503,8 @@ impl ParquetMetaDataReader { Ok(()) } - fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { + let start_offset: usize = start_offset.try_into()?; let metadata = self.metadata.as_mut().unwrap(); if self.column_index { let index = metadata @@ -531,7 +528,8 @@ impl ParquetMetaDataReader { Ok(()) } - fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { + let start_offset: usize = start_offset.try_into()?; let metadata = self.metadata.as_mut().unwrap(); if self.offset_index { let index = metadata @@ -555,7 +553,7 @@ impl ParquetMetaDataReader { Ok(()) } - fn range_for_page_index(&self) -> Option> { + fn range_for_page_index(&self) -> Option> { // sanity check self.metadata.as_ref()?; @@ -592,7 +590,7 @@ impl ParquetMetaDataReader { let footer_metadata_len = FOOTER_SIZE + metadata_len; self.metadata_size = Some(footer_metadata_len); - if footer_metadata_len > file_size as usize { + if footer_metadata_len > file_size { return Err(ParquetError::NeedMoreData(footer_metadata_len)); } @@ -607,7 +605,7 @@ impl ParquetMetaDataReader { /// been provided, then return that value if it is larger than the size of the Parquet /// file footer (8 bytes). Otherwise returns `8`. #[cfg(all(feature = "async", feature = "arrow"))] - fn get_prefetch_size(&self) -> usize { + fn get_prefetch_size(&self) -> u64 { if let Some(prefetch) = self.prefetch_hint { if prefetch > FOOTER_SIZE { return prefetch; @@ -620,8 +618,8 @@ impl ParquetMetaDataReader { async fn load_metadata( &self, fetch: &mut F, - file_size: usize, - ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { + file_size: u64, + ) -> Result<(ParquetMetaData, Option<(u64, Bytes)>)> { let prefetch = self.get_prefetch_size(); if file_size < FOOTER_SIZE { @@ -679,7 +677,7 @@ impl ParquetMetaDataReader { async fn load_metadata_via_suffix( &self, fetch: &mut F, - ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { + ) -> Result<(ParquetMetaData, Option<(u64, Bytes)>)> { let prefetch = self.get_prefetch_size(); let suffix = fetch.fetch_suffix(prefetch as _).await?; @@ -747,18 +745,18 @@ impl ParquetMetaDataReader { } else { return Err(general_err!("Invalid Parquet file. Corrupt footer")); }; - // get the metadata length from the footer + // get the metadata length from the footer (infallible) let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); + let metadata_length = metadata_len as u64; Ok(FooterTail { - // u32 won't be larger than usize in most cases - metadata_length: metadata_len as usize, + metadata_length, encrypted_footer, }) } /// Decodes the Parquet footer, returning the metadata length in bytes #[deprecated(note = "use decode_footer_tail instead")] - pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { + pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { Self::decode_footer_tail(slice).map(|f| f.metadata_length) } @@ -1091,7 +1089,7 @@ mod tests { #[test] fn test_try_parse() { let file = get_test_file("alltypes_tiny_pages.parquet"); - let len = file.len() as usize; + let len = file.len(); let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index fd3639ac3069..bb77b94e78cc 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -31,7 +31,18 @@ use std::ops::Range; /// Computes the covering range of two optional ranges /// /// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)` -pub(crate) fn acc_range(a: Option>, b: Option>) -> Option> { +pub(crate) fn acc_range(a: Option>, b: Option>) -> Option> { + match (a, b) { + (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)), + (None, x) | (x, None) => x, + } +} + +/// Computes the covering range of two optional ranges of `usize` +pub(crate) fn acc_range_usize( + a: Option>, + b: Option>, +) -> Option> { match (a, b) { (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)), (None, x) | (x, None) => x, diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 400441f0c9cd..e1c8c47b277a 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -19,11 +19,6 @@ //! readers to read individual column chunks, or access record //! iterator. -use bytes::{Buf, Bytes}; -use std::fs::File; -use std::io::{BufReader, Seek, SeekFrom}; -use std::{io::Read, sync::Arc}; - use crate::bloom_filter::Sbbf; use crate::column::page::PageIterator; use crate::column::{page::PageReader, reader::ColumnReader}; @@ -32,6 +27,10 @@ use crate::file::metadata::*; pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader}; use crate::record::reader::RowIter; use crate::schema::types::Type as SchemaType; +use bytes::{Buf, Bytes}; +use std::fs::File; +use std::io::{BufReader, Seek, SeekFrom}; +use std::{io::Read, sync::Arc}; use crate::basic::Type; @@ -77,7 +76,7 @@ pub trait ChunkReader: Length + Send + Sync { /// /// Similarly to [`Self::get_read`], this method may have side-effects on /// previously returned readers. - fn get_bytes(&self, start: u64, length: usize) -> Result; + fn get_bytes(&self, start: u64, length: u64) -> Result; } impl Length for File { @@ -95,11 +94,13 @@ impl ChunkReader for File { Ok(BufReader::new(self.try_clone()?)) } - fn get_bytes(&self, start: u64, length: usize) -> Result { - let mut buffer = Vec::with_capacity(length); + fn get_bytes(&self, start: u64, length: u64) -> Result { + let cap: usize = length.try_into()?; + let mut buffer = Vec::with_capacity(cap); let mut reader = self.try_clone()?; reader.seek(SeekFrom::Start(start))?; let read = reader.take(length as _).read_to_end(&mut buffer)?; + let read = read as u64; if read != length { return Err(eof_err!( @@ -126,8 +127,9 @@ impl ChunkReader for Bytes { Ok(self.slice(start..).reader()) } - fn get_bytes(&self, start: u64, length: usize) -> Result { - let start = start as usize; + fn get_bytes(&self, start: u64, length: u64) -> Result { + let start: usize = start.try_into()?; + let length: usize = length.try_into()?; Ok(self.slice(start..start + length)) } }