Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ thiserror = "1"
tokio = "1.40"
url = "2"

[patch.crates-io]
object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" }
# [patch.crates-io]
# object_store = { git = "https://github.com/kylebarron/arrow-rs", branch = "kyle/list-returns-static-stream" }

[profile.release]
lto = true
Expand Down
1 change: 1 addition & 0 deletions obstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tokio = { workspace = true, features = [
"sync",
] }
url = { workspace = true }
yoke = { version = "0.7.4", features = ["derive"] }

# We opt-in to using rustls as the TLS provider for reqwest, which is the HTTP
# library used by object_store.
Expand Down
82 changes: 58 additions & 24 deletions obstore/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use pyo3::prelude::*;
use pyo3_arrow::PyRecordBatch;
use pyo3_object_store::{PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use tokio::sync::Mutex;
use yoke::Yoke;

use crate::runtime::get_runtime;

Expand Down Expand Up @@ -47,6 +48,15 @@ impl IntoPy<PyObject> for PyObjectMeta {
}
}

#[derive(yoke::Yokeable)]
struct ListStreamWrapper<'a>(Fuse<BoxStream<'a, Result<ObjectMeta, object_store::Error>>>);

impl<'a> ListStreamWrapper<'a> {
fn new(stream: BoxStream<'a, Result<ObjectMeta, object_store::Error>>) -> Self {
Self(stream.fuse())
}
}

// Note: we fuse the underlying stream so that we can get `None` multiple times.
//
// In general, you can't poll an iterator after it's already emitted None. But the issue here is
Expand All @@ -65,19 +75,19 @@ impl IntoPy<PyObject> for PyObjectMeta {
// - https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.fuse
#[pyclass(name = "ListStream")]
pub(crate) struct PyListStream {
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<ObjectMeta>>>>>,
stream: Arc<Mutex<Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>>>>,
chunk_size: usize,
return_arrow: bool,
}

impl PyListStream {
fn new(
stream: BoxStream<'static, object_store::Result<ObjectMeta>>,
stream: Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>>,
chunk_size: usize,
return_arrow: bool,
) -> Self {
Self {
stream: Arc::new(Mutex::new(stream.fuse())),
stream: Arc::new(Mutex::new(stream)),
chunk_size,
return_arrow,
}
Expand Down Expand Up @@ -140,15 +150,15 @@ impl IntoPy<PyObject> for PyListIterResult {
}

async fn next_stream(
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<ObjectMeta>>>>>,
stream: Arc<Mutex<ListStreamWrapper<'static>>>,
chunk_size: usize,
sync: bool,
return_arrow: bool,
) -> PyResult<PyListIterResult> {
let mut stream = stream.lock().await;
let mut metas: Vec<PyObjectMeta> = vec![];
loop {
match stream.next().await {
match stream.0.next().await {
Some(Ok(meta)) => {
metas.push(PyObjectMeta(meta));
if metas.len() >= chunk_size {
Expand Down Expand Up @@ -188,26 +198,44 @@ async fn next_stream(
}

async fn collect_stream(
stream: Arc<Mutex<Fuse<BoxStream<'static, object_store::Result<ObjectMeta>>>>>,
// stream: Arc<Mutex<ListStreamWrapper<'static>>>,
stream: Arc<Mutex<Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>>>>,
return_arrow: bool,
) -> PyResult<PyListIterResult> {
let mut stream = stream.lock().await;
let mut metas: Vec<PyObjectMeta> = vec![];
loop {
match stream.next().await {
Some(Ok(meta)) => {
metas.push(PyObjectMeta(meta));
}
Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
None => match return_arrow {
true => {
return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas)));
}
false => {
return Ok(PyListIterResult::Native(metas));
stream.with_mut(|stream_inner| {
match stream_inner.0.next().await {
Some(Ok(meta)) => {
metas.push(PyObjectMeta(meta));
}
},
};
Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
None => match return_arrow {
true => {
return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas)));
}
false => {
return Ok(PyListIterResult::Native(metas));
}
},
};
});

// match stream.next().await {
// Some(Ok(meta)) => {
// metas.push(PyObjectMeta(meta));
// }
// Some(Err(e)) => return Err(PyObjectStoreError::from(e).into()),
// None => match return_arrow {
// true => {
// return Ok(PyListIterResult::Arrow(object_meta_to_arrow(&metas)));
// }
// false => {
// return Ok(PyListIterResult::Native(metas));
// }
// },
// };
}
}

Expand Down Expand Up @@ -357,11 +385,17 @@ pub(crate) fn list(

let store = store.into_inner().clone();
let prefix = prefix.map(|s| s.into());
let stream = if let Some(offset) = offset {
store.list_with_offset(prefix.as_ref(), &offset.into())
} else {
store.list(prefix.as_ref())
};

let stream: Yoke<ListStreamWrapper<'static>, Arc<dyn ObjectStore>> =
if let Some(offset) = offset {
Yoke::attach_to_cart(store.clone(), |cart| {
ListStreamWrapper::new(cart.list_with_offset(prefix.as_ref(), &offset.into()))
})
} else {
Yoke::attach_to_cart(store.clone(), |cart| {
ListStreamWrapper::new(cart.list(prefix.as_ref()))
})
};
Ok(PyListStream::new(stream, chunk_size, return_arrow))
}

Expand Down
Loading