Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ impl TryFrom<Blob> for ObjectMeta {
Ok(Self {
location: Path::parse(value.name)?,
last_modified: value.properties.last_modified,
size: value.properties.content_length as usize,
size: value.properties.content_length,
e_tag: value.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{PutPayload, Result};
#[derive(Debug)]
pub struct ChunkedStore {
inner: Arc<dyn ObjectStore>,
chunk_size: usize,
chunk_size: usize, // chunks are in memory, so we use usize not u64
}

impl ChunkedStore {
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ObjectStore for ChunkedStore {
})
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.inner.get_range(location, range).await
}

Expand Down Expand Up @@ -203,8 +203,8 @@ mod tests {

let mut remaining = 1001;
while let Some(next) = s.next().await {
let size = next.unwrap().len();
let expected = remaining.min(chunk_size);
let size = next.unwrap().len() as u64;
let expected = remaining.min(chunk_size as u64);
assert_eq!(size, expected);
remaining -= expected;
}
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ impl<T: GetClient> GetClientExt for T {

struct ContentRange {
/// The range of the object returned
range: Range<usize>,
range: Range<u64>,
/// The total size of the object being requested
size: usize,
size: u64,
}

impl ContentRange {
Expand All @@ -84,7 +84,7 @@ impl ContentRange {
let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;
let end: u64 = end_s.parse().ok()?;

Some(Self {
size,
Expand Down Expand Up @@ -140,8 +140,8 @@ enum GetResultError {

#[error("Requested {expected:?}, got {actual:?}")]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
expected: Range<u64>,
actual: Range<u64>,
},
}

Expand Down
2 changes: 1 addition & 1 deletion object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct ListPrefix {
#[serde(rename_all = "PascalCase")]
pub struct ListContents {
pub key: String,
pub size: usize,
pub size: u64,
pub last_modified: DateTime<Utc>,
#[serde(rename = "ETag")]
pub e_tag: Option<String>,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl MultiStatusResponse {
})?)
}

fn size(&self) -> Result<usize> {
fn size(&self) -> Result<u64> {
let size = self
.prop_stat
.prop
Expand Down Expand Up @@ -462,7 +462,7 @@ pub(crate) struct Prop {
last_modified: DateTime<Utc>,

#[serde(rename = "getcontentlength")]
content_length: Option<usize>,
content_length: Option<u64>,

#[serde(rename = "resourcetype")]
resource_type: ResourceType,
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
let range_result = storage.get_range(&location, range.clone()).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, data.slice(range.clone()));
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));

let opts = GetOptions {
range: Some(GetRange::Bounded(2..5)),
Expand Down Expand Up @@ -190,11 +190,11 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, data.slice(range.clone()))
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));
}

let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, data.len());
assert_eq!(head.size, data.len() as u64);

storage.delete(&location).await.unwrap();

Expand Down Expand Up @@ -934,7 +934,7 @@ pub async fn list_with_delimiter(storage: &DynObjectStore) {
let object = &result.objects[0];

assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
assert_eq!(object.size, data.len() as u64);

// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from("mydb/wb/000/000/001");
Expand Down Expand Up @@ -1085,7 +1085,7 @@ pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore
.unwrap();

let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, chunk_size * 2);
assert_eq!(meta.size, chunk_size as u64 * 2);

// Empty case
let path = Path::from("test_empty_multipart");
Expand Down
26 changes: 17 additions & 9 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@
//!
//! // Buffer the entire object in memory
//! let object: Bytes = result.bytes().await.unwrap();
//! assert_eq!(object.len(), meta.size);
//! assert_eq!(object.len() as u64, meta.size);
//!
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
Expand Down Expand Up @@ -630,7 +630,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.into()),
..Default::default()
Expand All @@ -640,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
Expand Down Expand Up @@ -820,14 +820,14 @@ macro_rules! as_ref_impl {
self.as_ref().get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
ranges: &[Range<u64>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
}
Expand Down Expand Up @@ -903,8 +903,10 @@ pub struct ObjectMeta {
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object
pub size: usize,
/// The size in bytes of the object.
///
/// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
pub size: u64,
/// The unique identifier for the object
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
Expand Down Expand Up @@ -1019,7 +1021,9 @@ pub struct GetResult {
/// The [`ObjectMeta`] for this object
pub meta: ObjectMeta,
/// The range of bytes returned by this request
pub range: Range<usize>,
///
/// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
pub range: Range<u64>,
/// Additional object attributes
pub attributes: Attributes,
}
Expand Down Expand Up @@ -1060,7 +1064,11 @@ impl GetResult {
path: path.clone(),
})?;

let mut buffer = Vec::with_capacity(len);
let mut buffer = if let Ok(len) = len.try_into() {
Vec::with_capacity(len)
} else {
Vec::new()
};
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(permit_get_result(r, permit))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
}
Expand Down
72 changes: 32 additions & 40 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::sync::Arc;
use std::time::SystemTime;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::BTreeSet, io};
use std::{collections::VecDeque, path::PathBuf};

use async_trait::async_trait;
Expand All @@ -44,12 +44,6 @@ use crate::{
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("File size for {} did not fit in a usize: {}", path, source)]
FileSizeOverflowedUsize {
source: std::num::TryFromIntError,
path: String,
},

#[error("Unable to walk dir: {}", source)]
UnableToWalkDir { source: walkdir::Error },

Expand Down Expand Up @@ -83,8 +77,8 @@ pub(crate) enum Error {
#[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)]
OutOfRange {
path: PathBuf,
expected: usize,
actual: usize,
expected: u64,
actual: u64,
},

#[error("Requested range was invalid")]
Expand Down Expand Up @@ -410,7 +404,7 @@ impl ObjectStore for LocalFileSystem {
let path = self.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
let (file, metadata) = open_file(&path)?;
let meta = convert_metadata(metadata, location)?;
let meta = convert_metadata(metadata, location);
options.check_preconditions(&meta)?;

let range = match options.range {
Expand All @@ -430,7 +424,7 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, _) = open_file(&path)?;
Expand All @@ -439,7 +433,7 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let path = self.path_to_filesystem(location)?;
let ranges = ranges.to_vec();
maybe_spawn_blocking(move || {
Expand Down Expand Up @@ -825,7 +819,7 @@ impl Drop for LocalUpload {
pub(crate) fn chunked_stream(
mut file: File,
path: PathBuf,
range: Range<usize>,
range: Range<u64>,
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
futures::stream::once(async move {
Expand All @@ -847,17 +841,23 @@ pub(crate) fn chunked_stream(
return Ok(None);
}

let to_read = remaining.min(chunk_size);
let mut buffer = Vec::with_capacity(to_read);
let to_read = remaining.min(chunk_size as u64);
let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
source: InvalidGetRange::TooLarge {
requested: to_read,
max: usize::MAX as u64,
},
})?;
let mut buffer = Vec::with_capacity(cap);
let read = (&mut file)
.take(to_read as u64)
.take(to_read)
.read_to_end(&mut buffer)
.map_err(|e| Error::UnableToReadBytes {
source: e,
path: path.clone(),
})?;

Ok(Some((buffer.into(), (file, path, remaining - read))))
Ok(Some((buffer.into(), (file, path, remaining - read as u64))))
})
},
);
Expand All @@ -867,22 +867,18 @@ pub(crate) fn chunked_stream(
.boxed()
}

pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<u64>) -> Result<Bytes> {
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.map_err(|source| {
let path = path.into();
Error::Seek { source, path }
})?;
file.seek(SeekFrom::Start(range.start)).map_err(|source| {
let path = path.into();
Error::Seek { source, path }
})?;

let mut buf = Vec::with_capacity(to_read);
let read = file
.take(to_read as u64)
.read_to_end(&mut buf)
.map_err(|source| {
let path = path.into();
Error::UnableToReadBytes { source, path }
})?;
let mut buf = Vec::with_capacity(to_read as usize);
let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| {
let path = path.into();
Error::UnableToReadBytes { source, path }
})? as u64;

if read != to_read {
let error = Error::OutOfRange {
Expand Down Expand Up @@ -922,7 +918,7 @@ fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {

fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
match entry.metadata() {
Ok(metadata) => convert_metadata(metadata, location).map(Some),
Ok(metadata) => Ok(Some(convert_metadata(metadata, location))),
Err(e) => {
if let Some(io_err) = e.io_error() {
if io_err.kind() == ErrorKind::NotFound {
Expand Down Expand Up @@ -960,20 +956,16 @@ fn get_etag(metadata: &Metadata) -> String {
format!("{inode:x}-{mtime:x}-{size:x}")
}

fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta {
let last_modified = last_modified(&metadata);
let size = usize::try_from(metadata.len()).map_err(|source| {
let path = location.as_ref().into();
Error::FileSizeOverflowedUsize { source, path }
})?;

Ok(ObjectMeta {
ObjectMeta {
location,
last_modified,
size,
size: metadata.len(),
e_tag: Some(get_etag(&metadata)),
version: None,
})
}
}

#[cfg(unix)]
Expand Down
Loading
Loading