From 15d7a6f5566c11cbf6dc1821f5354c5949adab16 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 17 Jan 2025 10:47:13 +0100 Subject: [PATCH 1/2] feat: simplify LocalPool handling It has been a footgun for a few users, including myself, to make sure to keep around the `LocalPool`. This changes the behaviour to construct a `LocalPool` and keep it around by default. If necessary, in the builder one can provide a custom handle, if there is the need for a custom pool. --- README.md | 11 ++---- examples/custom-protocol.rs | 7 ++-- examples/hello-world-fetch.rs | 7 ++-- examples/hello-world-provide.rs | 5 ++- examples/local-swarm-discovery.rs | 7 ++-- examples/transfer.rs | 7 ++-- src/net_protocol.rs | 56 ++++++++++++++++++++++++++----- src/rpc.rs | 2 +- src/rpc/client/blobs.rs | 17 ++-------- tests/blobs.rs | 8 ++--- tests/gc.rs | 6 +--- tests/rpc.rs | 7 ++-- 12 files changed, 70 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 2193daae4..c8f25a9f9 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Here is a basic example of how to set up `iroh-blobs` with `iroh`: ```rust use iroh::{protocol::Router, Endpoint}; -use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool}; +use iroh_blobs::net_protocol::Blobs; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -40,14 +40,10 @@ async fn main() -> anyhow::Result<()> { // we've built at number0 let endpoint = Endpoint::builder().discovery_n0().bind().await?; - // spawn a local pool with one thread per CPU - // for a single threaded pool use `LocalPool::single` - let local_pool = LocalPool::default(); - // create an in-memory blob store // use `iroh_blobs::net_protocol::Blobs::persistent` to load or create a // persistent blob store from a path - let blobs = Blobs::memory().build(local_pool.handle(), &endpoint); + let blobs = Blobs::memory().build(&endpoint); // turn on the "rpc" feature if you need to create blobs and tags clients let blobs_client = blobs.client(); @@ -60,9 +56,7 @@ async fn main() -> anyhow::Result<()> { .await?; // do fun stuff with the blobs protocol! - // make sure not to drop the local_pool before you are finished router.shutdown().await?; - drop(local_pool); drop(tags_client); Ok(()) } @@ -89,4 +83,3 @@ at your option. Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in this project by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. - diff --git a/examples/custom-protocol.rs b/examples/custom-protocol.rs index 31b246f70..83e0c6bb8 100644 --- a/examples/custom-protocol.rs +++ b/examples/custom-protocol.rs @@ -48,9 +48,7 @@ use iroh::{ protocol::{ProtocolHandler, Router}, Endpoint, NodeId, }; -use iroh_blobs::{ - net_protocol::Blobs, rpc::client::blobs::MemClient, util::local_pool::LocalPool, Hash, -}; +use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::MemClient, Hash}; use tracing_subscriber::{prelude::*, EnvFilter}; #[derive(Debug, Parser)] @@ -89,8 +87,7 @@ async fn main() -> Result<()> { // Build a in-memory node. For production code, you'd want a persistent node instead usually. let endpoint = Endpoint::builder().bind().await?; let builder = Router::builder(endpoint); - let local_pool = LocalPool::default(); - let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let blobs = Blobs::memory().build(builder.endpoint()); let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); let blobs_client = blobs.client(); diff --git a/examples/hello-world-fetch.rs b/examples/hello-world-fetch.rs index d8fb68955..74c494c46 100644 --- a/examples/hello-world-fetch.rs +++ b/examples/hello-world-fetch.rs @@ -7,9 +7,7 @@ use std::{env, str::FromStr}; use anyhow::{bail, ensure, Context, Result}; use iroh::{protocol::Router, Endpoint}; -use iroh_blobs::{ - net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool, BlobFormat, -}; +use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, BlobFormat}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -39,8 +37,7 @@ async fn main() -> Result<()> { // create a new node let endpoint = Endpoint::builder().bind().await?; let builder = Router::builder(endpoint); - let local_pool = LocalPool::default(); - let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let blobs = Blobs::memory().build(builder.endpoint()); let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); let node = builder.spawn().await?; let blobs_client = blobs.client(); diff --git a/examples/hello-world-provide.rs b/examples/hello-world-provide.rs index cd614dd9b..aa9fc7737 100644 --- a/examples/hello-world-provide.rs +++ b/examples/hello-world-provide.rs @@ -4,7 +4,7 @@ //! run this example from the project root: //! $ cargo run --example hello-world-provide use iroh::{protocol::Router, Endpoint}; -use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool}; +use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -24,8 +24,7 @@ async fn main() -> anyhow::Result<()> { // create a new node let endpoint = Endpoint::builder().bind().await?; let builder = Router::builder(endpoint); - let local_pool = LocalPool::default(); - let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let blobs = Blobs::memory().build(builder.endpoint()); let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); let blobs_client = blobs.client(); let node = builder.spawn().await?; diff --git a/examples/local-swarm-discovery.rs b/examples/local-swarm-discovery.rs index ccbb93043..f91e2f077 100644 --- a/examples/local-swarm-discovery.rs +++ b/examples/local-swarm-discovery.rs @@ -13,9 +13,7 @@ use iroh::{ discovery::local_swarm_discovery::LocalSwarmDiscovery, protocol::Router, Endpoint, NodeAddr, PublicKey, RelayMode, SecretKey, }; -use iroh_blobs::{ - net_protocol::Blobs, rpc::client::blobs::WrapOption, util::local_pool::LocalPool, Hash, -}; +use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::WrapOption, Hash}; use tracing_subscriber::{prelude::*, EnvFilter}; use self::progress::show_download_progress; @@ -73,8 +71,7 @@ async fn main() -> anyhow::Result<()> { .bind() .await?; let builder = Router::builder(endpoint); - let local_pool = LocalPool::default(); - let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint()); + let blobs = Blobs::memory().build(builder.endpoint()); let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); let node = builder.spawn().await?; let blobs_client = blobs.client(); diff --git a/examples/transfer.rs b/examples/transfer.rs index 15b457f05..63e156579 100644 --- a/examples/transfer.rs +++ b/examples/transfer.rs @@ -6,7 +6,7 @@ use iroh_blobs::{ net_protocol::Blobs, rpc::client::blobs::{ReadAtLen, WrapOption}, ticket::BlobTicket, - util::{local_pool::LocalPool, SetTagOption}, + util::SetTagOption, }; #[tokio::main] @@ -14,10 +14,8 @@ async fn main() -> Result<()> { // Create an endpoint, it allows creating and accepting // connections in the iroh p2p world let endpoint = Endpoint::builder().discovery_n0().bind().await?; - // We initialize the Blobs protocol in-memory - let local_pool = LocalPool::default(); - let blobs = Blobs::memory().build(&local_pool, &endpoint); + let blobs = Blobs::memory().build(&endpoint); // Now we build a router that accepts blobs connections & routes them // to the blobs protocol. @@ -85,7 +83,6 @@ async fn main() -> Result<()> { // Gracefully shut down the node println!("Shutting down."); node.shutdown().await?; - local_pool.shutdown().await; Ok(()) } diff --git a/src/net_protocol.rs b/src/net_protocol.rs index ca62c146b..8a983ae8a 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -3,7 +3,12 @@ // TODO: reduce API surface and add documentation #![allow(missing_docs)] -use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc}; +use std::{ + collections::BTreeSet, + fmt::Debug, + ops::{Deref, DerefMut}, + sync::Arc, +}; use anyhow::{bail, Result}; use futures_lite::future::Boxed as BoxedFuture; @@ -17,7 +22,7 @@ use crate::{ provider::EventSender, store::GcConfig, util::{ - local_pool::{self, LocalPoolHandle}, + local_pool::{self, LocalPool, LocalPoolHandle}, SetTagOption, }, BlobFormat, Hash, @@ -41,9 +46,26 @@ impl Default for GcState { } } +#[derive(Debug)] +enum Rt { + Handle(LocalPoolHandle), + Owned(LocalPool), +} + +impl Deref for Rt { + type Target = LocalPoolHandle; + + fn deref(&self) -> &Self::Target { + match self { + Self::Handle(ref handle) => handle, + Self::Owned(ref pool) => pool.handle(), + } + } +} + #[derive(Debug)] pub(crate) struct BlobsInner { - pub(crate) rt: LocalPoolHandle, + rt: Rt, pub(crate) store: S, events: EventSender, pub(crate) downloader: Downloader, @@ -53,6 +75,12 @@ pub(crate) struct BlobsInner { pub(crate) batches: tokio::sync::Mutex, } +impl BlobsInner { + pub(crate) fn rt(&self) -> &LocalPoolHandle { + &self.rt + } +} + #[derive(Debug, Clone)] pub struct Blobs { pub(crate) inner: Arc>, @@ -119,6 +147,7 @@ impl BlobBatches { pub struct Builder { store: S, events: Option, + rt: Option, } impl Builder { @@ -128,13 +157,23 @@ impl Builder { self } + /// Set a custom `LocalPoolHandle` to use. + pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self { + self.rt = Some(rt); + self + } + /// Build the Blobs protocol handler. - /// You need to provide a local pool handle and an endpoint. - pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs { + /// You need to provide a the endpoint. + pub fn build(self, endpoint: &Endpoint) -> Blobs { + let rt = self + .rt + .map(Rt::Handle) + .unwrap_or_else(|| Rt::Owned(LocalPool::default())); let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone()); Blobs::new( self.store, - rt.clone(), + rt, self.events.unwrap_or_default(), downloader, endpoint.clone(), @@ -148,6 +187,7 @@ impl Blobs { Builder { store, events: None, + rt: None, } } } @@ -169,9 +209,9 @@ impl Blobs { } impl Blobs { - pub fn new( + fn new( store: S, - rt: LocalPoolHandle, + rt: Rt, events: EventSender, downloader: Downloader, endpoint: Endpoint, diff --git a/src/rpc.rs b/src/rpc.rs index e1739b52c..a17cbb77c 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -110,7 +110,7 @@ impl Handler { } fn rt(&self) -> &LocalPoolHandle { - &self.0.rt + self.0.rt() } fn endpoint(&self) -> &Endpoint { diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 28d73316e..3b003fba0 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -1015,11 +1015,9 @@ mod tests { use super::RpcService; use crate::{ - downloader::Downloader, net_protocol::Blobs, provider::{CustomEventSender, EventSender}, rpc::client::{blobs, tags}, - util::local_pool::LocalPool, }; type RpcClient = quic_rpc::RpcClient; @@ -1029,7 +1027,6 @@ mod tests { pub struct Node { router: iroh::protocol::Router, client: RpcClient, - _local_pool: LocalPool, _rpc_task: AbortOnDropHandle<()>, } @@ -1067,19 +1064,12 @@ mod tests { .unwrap_or_else(|| Endpoint::builder().discovery_n0()) .bind() .await?; - let local_pool = LocalPool::single(); let mut router = Router::builder(endpoint.clone()); // Setup blobs - let downloader = - Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone()); - let blobs = Blobs::new( - store.clone(), - local_pool.handle().clone(), - events, - downloader, - endpoint.clone(), - ); + let blobs = Blobs::builder(store.clone()) + .events(events) + .build(&endpoint); router = router.accept(crate::ALPN, blobs.clone()); // Build the router @@ -1096,7 +1086,6 @@ mod tests { router, client, _rpc_task, - _local_pool: local_pool, }) } } diff --git a/tests/blobs.rs b/tests/blobs.rs index c74484050..6779ebe9a 100644 --- a/tests/blobs.rs +++ b/tests/blobs.rs @@ -5,14 +5,13 @@ use std::{ }; use iroh::Endpoint; -use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool}; +use iroh_blobs::{net_protocol::Blobs, store::GcConfig}; use testresult::TestResult; #[tokio::test] async fn blobs_gc_smoke() -> TestResult<()> { - let pool = LocalPool::default(); let endpoint = Endpoint::builder().bind().await?; - let blobs = Blobs::memory().build(pool.handle(), &endpoint); + let blobs = Blobs::memory().build(&endpoint); let client = blobs.client(); blobs.start_gc(GcConfig { period: Duration::from_millis(1), @@ -29,9 +28,8 @@ async fn blobs_gc_smoke() -> TestResult<()> { #[tokio::test] async fn blobs_gc_protected() -> TestResult<()> { - let pool = LocalPool::default(); let endpoint = Endpoint::builder().bind().await?; - let blobs = Blobs::memory().build(pool.handle(), &endpoint); + let blobs = Blobs::memory().build(&endpoint); let client = blobs.client(); let h1 = client.add_bytes(b"test".to_vec()).await?; let protected = Arc::new(Mutex::new(Vec::new())); diff --git a/tests/gc.rs b/tests/gc.rs index 56ab4746c..ea4526027 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -26,7 +26,6 @@ use iroh_blobs::{ MapMut, ReportLevel, Store, }, util::{ - local_pool::LocalPool, progress::{AsyncChannelProgressSender, ProgressSender as _}, Tag, }, @@ -42,7 +41,6 @@ pub struct Node { pub router: iroh::protocol::Router, pub blobs: Blobs, pub store: S, - pub _local_pool: LocalPool, } impl Node { @@ -100,8 +98,7 @@ pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { async fn node(store: S, gc_period: Duration) -> (Node, async_channel::Receiver<()>) { let (gc_send, gc_recv) = async_channel::unbounded(); let endpoint = Endpoint::builder().discovery_n0().bind().await.unwrap(); - let local_pool = LocalPool::single(); - let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint); + let blobs = Blobs::builder(store.clone()).build(&endpoint); let router = Router::builder(endpoint) .accept(iroh_blobs::ALPN, blobs.clone()) .spawn() @@ -120,7 +117,6 @@ async fn node(store: S, gc_period: Duration) -> (Node, async_channe store, router, blobs, - _local_pool: local_pool, }, gc_recv, ) diff --git a/tests/rpc.rs b/tests/rpc.rs index 6eefc4930..7dc12e7b2 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -1,7 +1,7 @@ #![cfg(feature = "test")] use std::{net::SocketAddr, path::PathBuf, vec}; -use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool}; +use iroh_blobs::net_protocol::Blobs; use quic_rpc::client::QuinnConnector; use tempfile::TempDir; use testresult::TestResult; @@ -15,16 +15,14 @@ type BlobsClient = iroh_blobs::rpc::client::blobs::Client; pub struct Node { pub router: iroh::protocol::Router, pub blobs: Blobs, - pub local_pool: LocalPool, pub rpc_task: AbortOnDropHandle<()>, } impl Node { pub async fn new(path: PathBuf) -> anyhow::Result<(Self, SocketAddr, Vec)> { let store = iroh_blobs::store::fs::Store::load(path).await?; - let local_pool = LocalPool::default(); let endpoint = iroh::Endpoint::builder().bind().await?; - let blobs = Blobs::builder(store).build(local_pool.handle(), &endpoint); + let blobs = Blobs::builder(store).build(&endpoint); let router = iroh::protocol::Router::builder(endpoint) .accept(iroh_blobs::ALPN, blobs.clone()) .spawn() @@ -41,7 +39,6 @@ impl Node { let node = Self { router, blobs, - local_pool, rpc_task, }; Ok((node, local_addr, key)) From 5e56fb0c50269ffb2c84a608bc8d020e1cc7a61a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 17 Jan 2025 10:54:32 +0100 Subject: [PATCH 2/2] fix unused --- src/net_protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 8a983ae8a..0eaca5781 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -241,7 +241,7 @@ impl Blobs { } pub fn rt(&self) -> &LocalPoolHandle { - &self.inner.rt + self.inner.rt() } pub fn downloader(&self) -> &Downloader {