Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl ChunkReader for ArrowColumnChunkData {
))
}

fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
fn get_bytes(&self, _start: u64, _length: u64) -> Result<Bytes> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This length would only have to change if we expected to be fetching more than 4GB in a single request, no?

unimplemented!()
}
}
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub trait MetadataSuffixFetch: MetadataFetch {
///
/// Note the returned type is a boxed future, often created by
/// [FutureExt::boxed]. See the trait documentation for an example
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result<Bytes>>;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, we'll never be fetching more than 4GB in a suffix request for metadata

}

/// An asynchronous interface to load [`ParquetMetaData`] from an async source
Expand All @@ -93,7 +93,7 @@ pub struct MetadataLoader<F> {
/// The in-progress metadata
metadata: ParquetMetaData,
/// The offset and bytes of remaining unparsed data
remainder: Option<(usize, Bytes)>,
remainder: Option<(u64, Bytes)>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an offset into the cached remainder, which is a buffer of the prefetch_size I believe. Assuming that we'll never be prefetching >4GB of data for the metadata, this shouldn't need to change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right -- we don't need to change this.

}

impl<F: MetadataFetch> MetadataLoader<F> {
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);

let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let length = footer.metadata_length();
let length = footer.metadata_length() as usize;

if file_size < length + FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand All @@ -146,6 +146,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;

let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
let footer_start: u64 = footer_start.try_into()?;
(
ParquetMetaDataReader::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
Expand Down Expand Up @@ -294,6 +295,8 @@ where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
let prefetch = prefetch.map(|v| v as u64);
let file_size = file_size as u64;
let fetch = MetadataFetchFn(fetch);
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
}

impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result<Bytes>> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would also have to change the suffix length as well as all other places that use usize as an offset in a file.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we necessarily would need to change it in the signature, but we would need to ensure that we cast the suffix to u64 internally rather than casting the length to usize.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suffix is the length of the returned Bytes, so I'd imagine this should remain a usize.

async move {
self.seek(SeekFrom::End(-(suffix as i64))).await?;
let mut buf = Vec::with_capacity(suffix);
Expand Down Expand Up @@ -1057,7 +1057,8 @@ impl ChunkReader for ColumnChunkData {
Ok(self.get(start)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
fn get_bytes(&self, start: u64, length: u64) -> Result<Bytes> {
let length: usize = length.try_into()?;
Ok(self.get(start)?.slice(..length))
}
}
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ impl ParquetObjectReader {
}

impl MetadataSuffixFetch for &mut ParquetObjectReader {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result<Bytes>> {
let options = GetOptions {
range: Some(GetRange::Suffix(suffix as u64)),
range: Some(GetRange::Suffix(suffix)),
..Default::default()
};
self.spawn(|store, path| {
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl std::ops::IndexMut<usize> for Block {
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: u64 = 20;

/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
Expand Down Expand Up @@ -317,7 +317,10 @@ impl Sbbf {
};

let buffer = match column_metadata.bloom_filter_length() {
Some(length) => reader.get_bytes(offset, length as usize),
Some(length) => {
let length: u64 = length.try_into()?;
reader.get_bytes(offset, length)
}
None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
}?;

Expand All @@ -343,7 +346,7 @@ impl Sbbf {
let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset - offset) as usize..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
reader.get_bytes(bitset_offset, bitset_length)?
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ pub enum ParquetError {
ArrowError(String),
/// Error when the requested column index is more than the
/// number of columns in the row group
IndexOutOfBound(usize, usize),
IndexOutOfBound(u64, u64),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an index into an array or vector, so this should remain usize IMO

/// An external error variant
External(Box<dyn Error + Send + Sync>),
/// Returned when a function needs more data to complete properly. The `usize` field indicates
/// Returned when a function needs more data to complete properly. The `u64` field indicates
/// the total number of bytes required, not the number of additional bytes.
NeedMoreData(usize),
NeedMoreData(u64),
Comment on lines -52 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is thrown when a provided buffer needs to have more data in it. Here too it doesn't make sense to ask for more bytes than a usize can index.

}

impl std::fmt::Display for ParquetError {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
since = "53.1.0",
note = "Use ParquetMetaDataReader::decode_footer_tail"
)]
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
pub fn decode_footer(slice: &[u8; FOOTER_SIZE as usize]) -> Result<u64> {
ParquetMetaDataReader::decode_footer_tail(slice).map(|f| f.metadata_length())
}
12 changes: 6 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1103,9 +1103,9 @@ impl ColumnChunkMetaData {
}

/// Returns the range for the offset index if any
pub(crate) fn column_index_range(&self) -> Option<Range<usize>> {
let offset = usize::try_from(self.column_index_offset?).ok()?;
let length = usize::try_from(self.column_index_length?).ok()?;
pub(crate) fn column_index_range(&self) -> Option<Range<u64>> {
let offset = u64::try_from(self.column_index_offset?).ok()?;
let length = u64::try_from(self.column_index_length?).ok()?;
Comment on lines +1106 to +1108
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these might be valid cases that we'd need to change

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Some(offset..(offset + length))
}

Expand All @@ -1120,9 +1120,9 @@ impl ColumnChunkMetaData {
}

/// Returns the range for the offset index if any
pub(crate) fn offset_index_range(&self) -> Option<Range<usize>> {
let offset = usize::try_from(self.offset_index_offset?).ok()?;
let length = usize::try_from(self.offset_index_length?).ok()?;
pub(crate) fn offset_index_range(&self) -> Option<Range<u64>> {
let offset = u64::try_from(self.offset_index_offset?).ok()?;
let length = u64::try_from(self.offset_index_length?).ok()?;
Some(offset..(offset + length))
}

Expand Down
66 changes: 32 additions & 34 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ pub struct ParquetMetaDataReader {
metadata: Option<ParquetMetaData>,
column_index: bool,
offset_index: bool,
prefetch_hint: Option<usize>,
// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
// `self.parse_metadata` is called.
metadata_size: Option<usize>,
prefetch_hint: Option<u64>,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these are lengths, they probably should be u64 as well (though I realize it is unlikely people will be "pre" fetching GBs of data

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think usize is appropriate here...one wouldn't be able to fetch more than 4GB on a 32-bit machine anyway. Also, there are only 4 bytes available in the footer for the metadata length, so metadata_size could probably be change to a u32 rather than u64.

/// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
/// `self.parse_metadata` is called.
metadata_size: Option<u64>,
#[cfg(feature = "encryption")]
file_decryption_properties: Option<FileDecryptionProperties>,
}
Expand All @@ -84,13 +84,13 @@ pub struct ParquetMetaDataReader {
///
/// This is parsed from the last 8 bytes of the Parquet file
pub struct FooterTail {
metadata_length: usize,
metadata_length: u64,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here...u32 if not left as usize

encrypted_footer: bool,
}

impl FooterTail {
/// The length of the footer metadata in bytes
pub fn metadata_length(&self) -> usize {
pub fn metadata_length(&self) -> u64 {
self.metadata_length
}

Expand Down Expand Up @@ -151,7 +151,7 @@ impl ParquetMetaDataReader {
/// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
/// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
/// in extra fetches being performed.
pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
pub fn with_prefetch_hint(mut self, prefetch: Option<u64>) -> Self {
self.prefetch_hint = prefetch;
self
}
Expand Down Expand Up @@ -209,7 +209,7 @@ impl ParquetMetaDataReader {
/// the request, and must include the Parquet footer. If page indexes are desired, the buffer
/// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
self.try_parse_sized(reader, reader.len() as usize)
self.try_parse_sized(reader, reader.len())
}

/// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
Expand Down Expand Up @@ -284,13 +284,13 @@ impl ParquetMetaDataReader {
/// }
/// let metadata = reader.finish().unwrap();
/// ```
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
self.metadata = match self.parse_metadata(reader) {
Ok(metadata) => Some(metadata),
Err(ParquetError::NeedMoreData(needed)) => {
// If reader is the same length as `file_size` then presumably there is no more to
// read, so return an EOF error.
if file_size == reader.len() as usize || needed > file_size {
if file_size == reader.len() || needed > file_size {
return Err(eof_err!(
"Parquet file too small. Size is {} but need {}",
file_size,
Expand All @@ -315,7 +315,7 @@ impl ParquetMetaDataReader {
/// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
/// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
self.read_page_indexes_sized(reader, reader.len() as usize)
self.read_page_indexes_sized(reader, reader.len())
}

/// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
Expand All @@ -326,7 +326,7 @@ impl ParquetMetaDataReader {
pub fn read_page_indexes_sized<R: ChunkReader>(
&mut self,
reader: &R,
file_size: usize,
file_size: u64,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!(
Expand All @@ -350,7 +350,7 @@ impl ParquetMetaDataReader {

// Check to see if needed range is within `file_range`. Checking `range.end` seems
// redundant, but it guards against `range_for_page_index()` returning garbage.
let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
let file_range = file_size.saturating_sub(reader.len())..file_size;
if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
// Requested range starts beyond EOF
if range.end > file_size {
Expand Down Expand Up @@ -378,7 +378,7 @@ impl ParquetMetaDataReader {
}

let bytes_needed = range.end - range.start;
let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
let offset = range.start;

self.parse_column_index(&bytes, offset)?;
Expand All @@ -397,7 +397,7 @@ impl ParquetMetaDataReader {
pub async fn load_and_finish<F: MetadataFetch>(
mut self,
fetch: F,
file_size: usize,
file_size: u64,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these various load APIs also need to be updated to use a u64 file length to support larger files in parquet

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

) -> Result<ParquetMetaData> {
self.try_load(fetch, file_size).await?;
self.finish()
Expand All @@ -423,11 +423,7 @@ impl ParquetMetaDataReader {
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(all(feature = "async", feature = "arrow"))]
pub async fn try_load<F: MetadataFetch>(
&mut self,
mut fetch: F,
file_size: usize,
) -> Result<()> {
pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;

self.metadata = Some(metadata);
Expand Down Expand Up @@ -473,7 +469,7 @@ impl ParquetMetaDataReader {
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
remainder: Option<(u64, Bytes)>,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
Expand Down Expand Up @@ -507,7 +503,8 @@ impl ParquetMetaDataReader {
Ok(())
}

fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
let start_offset: usize = start_offset.try_into()?;
let metadata = self.metadata.as_mut().unwrap();
if self.column_index {
let index = metadata
Expand All @@ -531,7 +528,8 @@ impl ParquetMetaDataReader {
Ok(())
}

fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
let start_offset: usize = start_offset.try_into()?;
let metadata = self.metadata.as_mut().unwrap();
if self.offset_index {
let index = metadata
Expand All @@ -555,7 +553,7 @@ impl ParquetMetaDataReader {
Ok(())
}

fn range_for_page_index(&self) -> Option<Range<usize>> {
fn range_for_page_index(&self) -> Option<Range<u64>> {
// sanity check
self.metadata.as_ref()?;

Expand Down Expand Up @@ -592,7 +590,7 @@ impl ParquetMetaDataReader {
let footer_metadata_len = FOOTER_SIZE + metadata_len;
self.metadata_size = Some(footer_metadata_len);

if footer_metadata_len > file_size as usize {
if footer_metadata_len > file_size {
return Err(ParquetError::NeedMoreData(footer_metadata_len));
}

Expand All @@ -607,7 +605,7 @@ impl ParquetMetaDataReader {
/// been provided, then return that value if it is larger than the size of the Parquet
/// file footer (8 bytes). Otherwise returns `8`.
#[cfg(all(feature = "async", feature = "arrow"))]
fn get_prefetch_size(&self) -> usize {
fn get_prefetch_size(&self) -> u64 {
if let Some(prefetch) = self.prefetch_hint {
if prefetch > FOOTER_SIZE {
return prefetch;
Expand All @@ -620,8 +618,8 @@ impl ParquetMetaDataReader {
async fn load_metadata<F: MetadataFetch>(
&self,
fetch: &mut F,
file_size: usize,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
file_size: u64,
) -> Result<(ParquetMetaData, Option<(u64, Bytes)>)> {
let prefetch = self.get_prefetch_size();

if file_size < FOOTER_SIZE {
Expand Down Expand Up @@ -679,7 +677,7 @@ impl ParquetMetaDataReader {
async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
&self,
fetch: &mut F,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
) -> Result<(ParquetMetaData, Option<(u64, Bytes)>)> {
let prefetch = self.get_prefetch_size();

let suffix = fetch.fetch_suffix(prefetch as _).await?;
Expand Down Expand Up @@ -747,18 +745,18 @@ impl ParquetMetaDataReader {
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
};
// get the metadata length from the footer
// get the metadata length from the footer (infallible)
let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
let metadata_length = metadata_len as u64;
Ok(FooterTail {
// u32 won't be larger than usize in most cases
metadata_length: metadata_len as usize,
metadata_length,
encrypted_footer,
})
}

/// Decodes the Parquet footer, returning the metadata length in bytes
#[deprecated(note = "use decode_footer_tail instead")]
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<u64> {
Self::decode_footer_tail(slice).map(|f| f.metadata_length)
}

Expand Down Expand Up @@ -1091,7 +1089,7 @@ mod tests {
#[test]
fn test_try_parse() {
let file = get_test_file("alltypes_tiny_pages.parquet");
let len = file.len() as usize;
let len = file.len();

let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);

Expand Down
Loading
Loading