Skip to content

Commit 62c5c72

Browse files
committed
Update more parquet APIs to use u64
1 parent b4f1f90 commit 62c5c72

File tree

8 files changed

+62
-47
lines changed

8 files changed

+62
-47
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ impl ChunkReader for ArrowColumnChunkData {
431431
))
432432
}
433433

434-
fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
434+
fn get_bytes(&self, _start: u64, _length: u64`) -> Result<Bytes> {
435435
unimplemented!()
436436
}
437437
}

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::arrow::async_reader::AsyncFileReader;
1919
use crate::errors::{ParquetError, Result};
2020
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
2121
use crate::file::page_index::index::Index;
22-
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
22+
use crate::file::page_index::index_reader::{acc_range_usize, decode_column_index, decode_offset_index};
2323
use crate::file::FOOTER_SIZE;
2424
use bytes::Bytes;
2525
use futures::future::BoxFuture;
@@ -125,7 +125,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
125125
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
126126

127127
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
128-
let length = footer.metadata_length();
128+
let length = footer.metadata_length() as usize;
129129

130130
if file_size < length + FOOTER_SIZE {
131131
return Err(ParquetError::EOF(format!(
@@ -181,8 +181,8 @@ impl<F: MetadataFetch> MetadataLoader<F> {
181181

182182
let mut range = None;
183183
for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
184-
range = acc_range(range, c.column_index_range());
185-
range = acc_range(range, c.offset_index_range());
184+
range = acc_range_usize(range, c.column_index_range());
185+
range = acc_range_usize(range, c.offset_index_range());
186186
}
187187
let range = match range {
188188
None => return Ok(()),
@@ -294,6 +294,8 @@ where
294294
F: FnMut(Range<usize>) -> Fut + Send,
295295
Fut: Future<Output = Result<Bytes>> + Send,
296296
{
297+
let prefetch = prefetch.map(|v| v as u64);
298+
let file_size = file_size as u64;
297299
let fetch = MetadataFetchFn(fetch);
298300
ParquetMetaDataReader::new()
299301
.with_prefetch_hint(prefetch)

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
138138
}
139139

140140
impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
141-
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
141+
fn fetch_suffix(&mut self, suffix: u64) -> BoxFuture<'_, Result<Bytes>> {
142142
async move {
143143
self.seek(SeekFrom::End(-(suffix as i64))).await?;
144144
let mut buf = Vec::with_capacity(suffix);
@@ -1058,6 +1058,7 @@ impl ChunkReader for ColumnChunkData {
10581058
}
10591059

10601060
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1061+
let length: usize = length.try_into()?;
10611062
Ok(self.get(start)?.slice(..length))
10621063
}
10631064
}

parquet/src/errors.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ pub enum ParquetError {
4646
ArrowError(String),
4747
/// Error when the requested column index is more than the
4848
/// number of columns in the row group
49-
IndexOutOfBound(usize, usize),
49+
IndexOutOfBound(u64, u64),
5050
/// An external error variant
5151
External(Box<dyn Error + Send + Sync>),
52-
/// Returned when a function needs more data to complete properly. The `usize` field indicates
52+
/// Returned when a function needs more data to complete properly. The `u64` field indicates
5353
/// the total number of bytes required, not the number of additional bytes.
54-
NeedMoreData(usize),
54+
NeedMoreData(u64),
5555
}
5656

5757
impl std::fmt::Display for ParquetError {

parquet/src/file/footer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,6 @@ pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
7676
since = "53.1.0",
7777
note = "Use ParquetMetaDataReader::decode_footer_tail"
7878
)]
79-
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
79+
pub fn decode_footer(slice: &[u8; FOOTER_SIZE as usize]) -> Result<u64> {
8080
ParquetMetaDataReader::decode_footer_tail(slice).map(|f| f.metadata_length())
8181
}

parquet/src/file/metadata/reader.rs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ pub struct ParquetMetaDataReader {
7272
metadata: Option<ParquetMetaData>,
7373
column_index: bool,
7474
offset_index: bool,
75-
prefetch_hint: Option<usize>,
76-
// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
77-
// `self.parse_metadata` is called.
78-
metadata_size: Option<usize>,
75+
prefetch_hint: Option<u64>,
76+
/// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
77+
/// `self.parse_metadata` is called.
78+
metadata_size: Option<u64>,
7979
#[cfg(feature = "encryption")]
8080
file_decryption_properties: Option<FileDecryptionProperties>,
8181
}
@@ -84,13 +84,13 @@ pub struct ParquetMetaDataReader {
8484
///
8585
/// This is parsed from the last 8 bytes of the Parquet file
8686
pub struct FooterTail {
87-
metadata_length: usize,
87+
metadata_length: u64,
8888
encrypted_footer: bool,
8989
}
9090

9191
impl FooterTail {
9292
/// The length of the footer metadata in bytes
93-
pub fn metadata_length(&self) -> usize {
93+
pub fn metadata_length(&self) -> u64 {
9494
self.metadata_length
9595
}
9696

@@ -151,7 +151,7 @@ impl ParquetMetaDataReader {
151151
/// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
152152
/// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
153153
/// in extra fetches being performed.
154-
pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
154+
pub fn with_prefetch_hint(mut self, prefetch: Option<u64>) -> Self {
155155
self.prefetch_hint = prefetch;
156156
self
157157
}
@@ -209,7 +209,7 @@ impl ParquetMetaDataReader {
209209
/// the request, and must include the Parquet footer. If page indexes are desired, the buffer
210210
/// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
211211
pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
212-
self.try_parse_sized(reader, reader.len() as usize)
212+
self.try_parse_sized(reader, reader.len())
213213
}
214214

215215
/// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
@@ -284,13 +284,13 @@ impl ParquetMetaDataReader {
284284
/// }
285285
/// let metadata = reader.finish().unwrap();
286286
/// ```
287-
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
287+
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
288288
self.metadata = match self.parse_metadata(reader) {
289289
Ok(metadata) => Some(metadata),
290290
Err(ParquetError::NeedMoreData(needed)) => {
291291
// If reader is the same length as `file_size` then presumably there is no more to
292292
// read, so return an EOF error.
293-
if file_size == reader.len() as usize || needed > file_size {
293+
if file_size == reader.len() || needed > file_size {
294294
return Err(eof_err!(
295295
"Parquet file too small. Size is {} but need {}",
296296
file_size,
@@ -315,7 +315,7 @@ impl ParquetMetaDataReader {
315315
/// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
316316
/// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
317317
pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
318-
self.read_page_indexes_sized(reader, reader.len() as usize)
318+
self.read_page_indexes_sized(reader, reader.len())
319319
}
320320

321321
/// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
@@ -326,7 +326,7 @@ impl ParquetMetaDataReader {
326326
pub fn read_page_indexes_sized<R: ChunkReader>(
327327
&mut self,
328328
reader: &R,
329-
file_size: usize,
329+
file_size: u64,
330330
) -> Result<()> {
331331
if self.metadata.is_none() {
332332
return Err(general_err!(
@@ -350,7 +350,7 @@ impl ParquetMetaDataReader {
350350

351351
// Check to see if needed range is within `file_range`. Checking `range.end` seems
352352
// redundant, but it guards against `range_for_page_index()` returning garbage.
353-
let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
353+
let file_range = file_size.saturating_sub(reader.len())..file_size;
354354
if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
355355
// Requested range starts beyond EOF
356356
if range.end > file_size {
@@ -378,7 +378,7 @@ impl ParquetMetaDataReader {
378378
}
379379

380380
let bytes_needed = range.end - range.start;
381-
let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
381+
let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
382382
let offset = range.start;
383383

384384
self.parse_column_index(&bytes, offset)?;
@@ -397,7 +397,7 @@ impl ParquetMetaDataReader {
397397
pub async fn load_and_finish<F: MetadataFetch>(
398398
mut self,
399399
fetch: F,
400-
file_size: usize,
400+
file_size: u64,
401401
) -> Result<ParquetMetaData> {
402402
self.try_load(fetch, file_size).await?;
403403
self.finish()
@@ -426,7 +426,7 @@ impl ParquetMetaDataReader {
426426
pub async fn try_load<F: MetadataFetch>(
427427
&mut self,
428428
mut fetch: F,
429-
file_size: usize,
429+
file_size: u64,
430430
) -> Result<()> {
431431
let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
432432

@@ -473,7 +473,7 @@ impl ParquetMetaDataReader {
473473
async fn load_page_index_with_remainder<F: MetadataFetch>(
474474
&mut self,
475475
mut fetch: F,
476-
remainder: Option<(usize, Bytes)>,
476+
remainder: Option<(u64, Bytes)>,
477477
) -> Result<()> {
478478
if self.metadata.is_none() {
479479
return Err(general_err!("Footer metadata is not present"));
@@ -507,7 +507,8 @@ impl ParquetMetaDataReader {
507507
Ok(())
508508
}
509509

510-
fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
510+
fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
511+
let start_offset: usize = start_offset.try_into()?;
511512
let metadata = self.metadata.as_mut().unwrap();
512513
if self.column_index {
513514
let index = metadata
@@ -531,7 +532,8 @@ impl ParquetMetaDataReader {
531532
Ok(())
532533
}
533534

534-
fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
535+
fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
536+
let start_offset: usize = start_offset.try_into()?;
535537
let metadata = self.metadata.as_mut().unwrap();
536538
if self.offset_index {
537539
let index = metadata
@@ -555,7 +557,7 @@ impl ParquetMetaDataReader {
555557
Ok(())
556558
}
557559

558-
fn range_for_page_index(&self) -> Option<Range<usize>> {
560+
fn range_for_page_index(&self) -> Option<Range<u64>> {
559561
// sanity check
560562
self.metadata.as_ref()?;
561563

@@ -592,7 +594,7 @@ impl ParquetMetaDataReader {
592594
let footer_metadata_len = FOOTER_SIZE + metadata_len;
593595
self.metadata_size = Some(footer_metadata_len);
594596

595-
if footer_metadata_len > file_size as usize {
597+
if footer_metadata_len > file_size {
596598
return Err(ParquetError::NeedMoreData(footer_metadata_len));
597599
}
598600

@@ -607,7 +609,7 @@ impl ParquetMetaDataReader {
607609
/// been provided, then return that value if it is larger than the size of the Parquet
608610
/// file footer (8 bytes). Otherwise returns `8`.
609611
#[cfg(all(feature = "async", feature = "arrow"))]
610-
fn get_prefetch_size(&self) -> usize {
612+
fn get_prefetch_size(&self) -> u64 {
611613
if let Some(prefetch) = self.prefetch_hint {
612614
if prefetch > FOOTER_SIZE {
613615
return prefetch;
@@ -620,8 +622,8 @@ impl ParquetMetaDataReader {
620622
async fn load_metadata<F: MetadataFetch>(
621623
&self,
622624
fetch: &mut F,
623-
file_size: usize,
624-
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
625+
file_size: u64,
626+
) -> Result<(ParquetMetaData, Option<(u64, Bytes)>)> {
625627
let prefetch = self.get_prefetch_size();
626628

627629
if file_size < FOOTER_SIZE {
@@ -679,7 +681,7 @@ impl ParquetMetaDataReader {
679681
async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
680682
&self,
681683
fetch: &mut F,
682-
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
684+
) -> Result<(ParquetMetaData, Option<(u64, Bytes)>)> {
683685
let prefetch = self.get_prefetch_size();
684686

685687
let suffix = fetch.fetch_suffix(prefetch as _).await?;
@@ -747,18 +749,18 @@ impl ParquetMetaDataReader {
747749
} else {
748750
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
749751
};
750-
// get the metadata length from the footer
752+
// get the metadata length from the footer (infallible)
751753
let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
754+
let metadata_length = metadata_len as u64;
752755
Ok(FooterTail {
753-
// u32 won't be larger than usize in most cases
754-
metadata_length: metadata_len as usize,
756+
metadata_length,
755757
encrypted_footer,
756758
})
757759
}
758760

759761
/// Decodes the Parquet footer, returning the metadata length in bytes
760762
#[deprecated(note = "use decode_footer_tail instead")]
761-
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
763+
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<u64> {
762764
Self::decode_footer_tail(slice).map(|f| f.metadata_length)
763765
}
764766

@@ -1091,7 +1093,7 @@ mod tests {
10911093
#[test]
10921094
fn test_try_parse() {
10931095
let file = get_test_file("alltypes_tiny_pages.parquet");
1094-
let len = file.len() as usize;
1096+
let len = file.len();
10951097

10961098
let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
10971099

parquet/src/file/page_index/index_reader.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,15 @@ use std::ops::Range;
3131
/// Computes the covering range of two optional ranges
3232
///
3333
/// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)`
34-
pub(crate) fn acc_range(a: Option<Range<usize>>, b: Option<Range<usize>>) -> Option<Range<usize>> {
34+
pub(crate) fn acc_range(a: Option<Range<u64>>, b: Option<Range<u64>>) -> Option<Range<u64>> {
35+
match (a, b) {
36+
(Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
37+
(None, x) | (x, None) => x,
38+
}
39+
}
40+
41+
/// Computes the covering range of two optional ranges of `usize`
42+
pub(crate) fn acc_range_usize(a: Option<Range<usize>>, b: Option<Range<usize>>) -> Option<Range<usize>> {
3543
match (a, b) {
3644
(Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
3745
(None, x) | (x, None) => x,

parquet/src/file/reader.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use bytes::{Buf, Bytes};
2323
use std::fs::File;
2424
use std::io::{BufReader, Seek, SeekFrom};
2525
use std::{io::Read, sync::Arc};
26-
2726
use crate::bloom_filter::Sbbf;
2827
use crate::column::page::PageIterator;
2928
use crate::column::{page::PageReader, reader::ColumnReader};
@@ -77,7 +76,7 @@ pub trait ChunkReader: Length + Send + Sync {
7776
///
7877
/// Similarly to [`Self::get_read`], this method may have side-effects on
7978
/// previously returned readers.
80-
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
79+
fn get_bytes(&self, start: u64, length: u64) -> Result<Bytes>;
8180
}
8281

8382
impl Length for File {
@@ -95,11 +94,13 @@ impl ChunkReader for File {
9594
Ok(BufReader::new(self.try_clone()?))
9695
}
9796

98-
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
99-
let mut buffer = Vec::with_capacity(length);
97+
fn get_bytes(&self, start: u64, length: u64) -> Result<Bytes> {
98+
let cap: usize = length.try_into()?;
99+
let mut buffer = Vec::with_capacity(cap);
100100
let mut reader = self.try_clone()?;
101101
reader.seek(SeekFrom::Start(start))?;
102102
let read = reader.take(length as _).read_to_end(&mut buffer)?;
103+
let read = read as u64;
103104

104105
if read != length {
105106
return Err(eof_err!(
@@ -126,8 +127,9 @@ impl ChunkReader for Bytes {
126127
Ok(self.slice(start..).reader())
127128
}
128129

129-
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
130-
let start = start as usize;
130+
fn get_bytes(&self, start: u64, length: u64) -> Result<Bytes> {
131+
let start: usize = start.try_into()?;
132+
let length: usize = length.try_into()?;
131133
Ok(self.slice(start..start + length))
132134
}
133135
}

0 commit comments

Comments
 (0)