diff --git a/Cargo.lock b/Cargo.lock index e77b5de6d..74d9aaafd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1027,6 +1027,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -3908,6 +3918,7 @@ dependencies = [ "clap", "colored", "config", + "crossbeam-skiplist", "derive_more", "fern", "futures", @@ -4013,6 +4024,7 @@ version = "3.0.0-alpha.12-develop" dependencies = [ "async-std", "criterion", + "crossbeam-skiplist", "futures", "rstest", "tokio", diff --git a/Cargo.toml b/Cargo.toml index d045b945a..f440799cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ chrono = { version = "0", default-features = false, features = ["clock"] } clap = { version = "4", features = ["derive", "env"] } colored = "2" config = "0" +crossbeam-skiplist = "0.1" derive_more = "0" fern = "0" futures = "0" @@ -63,8 +64,8 @@ serde_json = "1" serde_repr = "0" thiserror = "1" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } -torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "packages/clock" } +torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" } torrust-tracker-contrib-bencode = { version = "3.0.0-alpha.12-develop", path = "contrib/bencode" } torrust-tracker-located-error = { version = "3.0.0-alpha.12-develop", path = "packages/located-error" } torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "packages/primitives" } @@ -76,7 +77,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [package.metadata.cargo-machete] -ignored = ["serde_bytes"] +ignored = ["serde_bytes", "crossbeam-skiplist"] [dev-dependencies] local-ip-address = "0" @@ -105,4 +106,4 @@ opt-level = 3 [profile.release-debug] inherits = "release" -debug = true \ No newline at end of file +debug = true diff --git a/cSpell.json b/cSpell.json index 0ee2f8306..0480590af 100644 --- a/cSpell.json +++ b/cSpell.json @@ -135,6 +135,7 @@ "Shareaza", "sharktorrent", "SHLVL", + "skiplist", "socketaddr", "sqllite", "subsec", diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 4cea8767f..5f1a20d32 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -16,11 +16,12 @@ rust-version.workspace = true version.workspace = true [dependencies] +crossbeam-skiplist = "0.1" futures = "0.3.29" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } -torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" } -torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" } +torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" } +torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" } [dev-dependencies] criterion = { version = "0", features = ["async_tokio"] } diff --git a/packages/torrent-repository/benches/repository_benchmark.rs b/packages/torrent-repository/benches/repository_benchmark.rs index a3684c8e2..65608c86c 100644 --- a/packages/torrent-repository/benches/repository_benchmark.rs +++ b/packages/torrent-repository/benches/repository_benchmark.rs @@ -5,7 +5,7 @@ mod helpers; use criterion::{criterion_group, criterion_main, Criterion}; use torrust_tracker_torrent_repository::{ TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, - TorrentsRwLockTokioMutexTokio, + TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, }; use crate::helpers::{asyn, sync}; @@ -45,6 +45,10 @@ fn add_one_torrent(c: &mut Criterion) { .iter_custom(asyn::add_one_torrent::); }); + group.bench_function("SkipMapMutexStd", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + group.finish(); } @@ -89,6 +93,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.finish(); } @@ -133,6 +142,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) { .iter_custom(|iters| asyn::update_one_torrent_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + group.finish(); } @@ -178,6 +192,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) { }); }); + group.bench_function("SkipMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.finish(); } diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 8bb1b6def..ccaf579e3 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -1,5 +1,8 @@ use std::sync::Arc; +use repository::rw_lock_std::RwLockStd; +use repository::rw_lock_tokio::RwLockTokio; +use repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_clock::clock; pub mod entry; @@ -9,12 +12,14 @@ pub type EntrySingle = entry::Torrent; pub type EntryMutexStd = Arc>; pub type EntryMutexTokio = Arc>; -pub type TorrentsRwLockStd = repository::RwLockStd; -pub type TorrentsRwLockStdMutexStd = repository::RwLockStd; -pub type TorrentsRwLockStdMutexTokio = repository::RwLockStd; -pub type TorrentsRwLockTokio = repository::RwLockTokio; -pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio; -pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio; +pub type TorrentsRwLockStd = RwLockStd; +pub type TorrentsRwLockStdMutexStd = RwLockStd; +pub type TorrentsRwLockStdMutexTokio = RwLockStd; +pub type TorrentsRwLockTokio = RwLockTokio; +pub type TorrentsRwLockTokioMutexStd = RwLockTokio; +pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; + +pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; /// This code needs to be copied into each crate. /// Working version, for production. diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index 494040c9d..975a876d8 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -11,6 +11,7 @@ pub mod rw_lock_std_mutex_tokio; pub mod rw_lock_tokio; pub mod rw_lock_tokio_mutex_std; pub mod rw_lock_tokio_mutex_tokio; +pub mod skip_map_mutex_std; use std::fmt::Debug; @@ -40,37 +41,3 @@ pub trait RepositoryAsync: Debug + Default + Sized + 'static { peer: &peer::Peer, ) -> impl std::future::Future + Send; } - -#[derive(Default, Debug)] -pub struct RwLockStd { - torrents: std::sync::RwLock>, -} - -#[derive(Default, Debug)] -pub struct RwLockTokio { - torrents: tokio::sync::RwLock>, -} - -impl RwLockStd { - /// # Panics - /// - /// Panics if unable to get a lock. - pub fn write( - &self, - ) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { - self.torrents.write().expect("it should get lock") - } -} - -impl RwLockTokio { - pub fn write( - &self, - ) -> impl std::future::Future< - Output = tokio::sync::RwLockWriteGuard< - '_, - std::collections::BTreeMap, - >, - > { - self.torrents.write() - } -} diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index 9d7f29416..e9074a271 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -11,6 +11,22 @@ use super::Repository; use crate::entry::Entry; use crate::{EntrySingle, TorrentsRwLockStd}; +#[derive(Default, Debug)] +pub struct RwLockStd { + pub(crate) torrents: std::sync::RwLock>, +} + +impl RwLockStd { + /// # Panics + /// + /// Panics if unable to get a lock. + pub fn write( + &self, + ) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { + self.torrents.write().expect("it should get lock") + } +} + impl TorrentsRwLockStd { fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> where diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index fa84e2451..d84074eaf 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -11,6 +11,24 @@ use super::RepositoryAsync; use crate::entry::Entry; use crate::{EntrySingle, TorrentsRwLockTokio}; +#[derive(Default, Debug)] +pub struct RwLockTokio { + pub(crate) torrents: tokio::sync::RwLock>, +} + +impl RwLockTokio { + pub fn write( + &self, + ) -> impl std::future::Future< + Output = tokio::sync::RwLockWriteGuard< + '_, + std::collections::BTreeMap, + >, + > { + self.torrents.write() + } +} + impl TorrentsRwLockTokio { async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> where diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs new file mode 100644 index 000000000..0c0127b15 --- /dev/null +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -0,0 +1,106 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use crossbeam_skiplist::SkipMap; +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; + +use super::Repository; +use crate::entry::{Entry, EntrySync}; +use crate::{EntryMutexStd, EntrySingle}; + +#[derive(Default, Debug)] +pub struct CrossbeamSkipList { + pub torrents: SkipMap, +} + +impl Repository for CrossbeamSkipList +where + EntryMutexStd: EntrySync, + EntrySingle: Entry, +{ + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().insert_or_update_peer_and_get_stats(peer) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().expect("it should get a lock").get_stats(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryMutexStd::new( + EntrySingle { + peers: BTreeMap::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 3a4b53d2f..5a86aa3cf 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -7,49 +7,54 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, - TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, }; #[derive(Debug)] pub(crate) enum Repo { - Std(TorrentsRwLockStd), - StdMutexStd(TorrentsRwLockStdMutexStd), - StdMutexTokio(TorrentsRwLockStdMutexTokio), - Tokio(TorrentsRwLockTokio), - TokioMutexStd(TorrentsRwLockTokioMutexStd), - TokioMutexTokio(TorrentsRwLockTokioMutexTokio), + RwLockStd(TorrentsRwLockStd), + RwLockStdMutexStd(TorrentsRwLockStdMutexStd), + RwLockStdMutexTokio(TorrentsRwLockStdMutexTokio), + RwLockTokio(TorrentsRwLockTokio), + RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd), + RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio), + SkipMapMutexStd(TorrentsSkipMapMutexStd), } impl Repo { pub(crate) async fn get(&self, key: &InfoHash) -> Option { match self { - Repo::Std(repo) => repo.get(key), - Repo::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), - Repo::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), - Repo::Tokio(repo) => repo.get(key).await, - Repo::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), - Repo::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::RwLockStd(repo) => repo.get(key), + Repo::RwLockStdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::RwLockStdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::RwLockTokio(repo) => repo.get(key).await, + Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), + Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), } } + pub(crate) async fn get_metrics(&self) -> TorrentsMetrics { match self { - Repo::Std(repo) => repo.get_metrics(), - Repo::StdMutexStd(repo) => repo.get_metrics(), - Repo::StdMutexTokio(repo) => repo.get_metrics().await, - Repo::Tokio(repo) => repo.get_metrics().await, - Repo::TokioMutexStd(repo) => repo.get_metrics().await, - Repo::TokioMutexTokio(repo) => repo.get_metrics().await, + Repo::RwLockStd(repo) => repo.get_metrics(), + Repo::RwLockStdMutexStd(repo) => repo.get_metrics(), + Repo::RwLockStdMutexTokio(repo) => repo.get_metrics().await, + Repo::RwLockTokio(repo) => repo.get_metrics().await, + Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await, + Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await, + Repo::SkipMapMutexStd(repo) => repo.get_metrics(), } } + pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { match self { - Repo::Std(repo) => repo.get_paginated(pagination), - Repo::StdMutexStd(repo) => repo + Repo::RwLockStd(repo) => repo.get_paginated(pagination), + Repo::RwLockStdMutexStd(repo) => repo .get_paginated(pagination) .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), - Repo::StdMutexTokio(repo) => { + Repo::RwLockStdMutexTokio(repo) => { let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { @@ -57,14 +62,14 @@ impl Repo { } v } - Repo::Tokio(repo) => repo.get_paginated(pagination).await, - Repo::TokioMutexStd(repo) => repo + Repo::RwLockTokio(repo) => repo.get_paginated(pagination).await, + Repo::RwLockTokioMutexStd(repo) => repo .get_paginated(pagination) .await .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), - Repo::TokioMutexTokio(repo) => { + Repo::RwLockTokioMutexTokio(repo) => { let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { @@ -72,76 +77,102 @@ impl Repo { } v } + Repo::SkipMapMutexStd(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), } } + pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { match self { - Repo::Std(repo) => repo.import_persistent(persistent_torrents), - Repo::StdMutexStd(repo) => repo.import_persistent(persistent_torrents), - Repo::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, - Repo::Tokio(repo) => repo.import_persistent(persistent_torrents).await, - Repo::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, - Repo::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockStd(repo) => repo.import_persistent(persistent_torrents), + Repo::RwLockStdMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::RwLockStdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, + Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents), } } + pub(crate) async fn remove(&self, key: &InfoHash) -> Option { match self { - Repo::Std(repo) => repo.remove(key), - Repo::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), - Repo::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), - Repo::Tokio(repo) => repo.remove(key).await, - Repo::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), - Repo::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::RwLockStd(repo) => repo.remove(key), + Repo::RwLockStdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::RwLockStdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::RwLockTokio(repo) => repo.remove(key).await, + Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), + Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), } } + pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { match self { - Repo::Std(repo) => repo.remove_inactive_peers(current_cutoff), - Repo::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), - Repo::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, - Repo::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await, - Repo::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, - Repo::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::RwLockStdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::RwLockStdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), } } + pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { match self { - Repo::Std(repo) => repo.remove_peerless_torrents(policy), - Repo::StdMutexStd(repo) => repo.remove_peerless_torrents(policy), - Repo::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, - Repo::Tokio(repo) => repo.remove_peerless_torrents(policy).await, - Repo::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, - Repo::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockStd(repo) => repo.remove_peerless_torrents(policy), + Repo::RwLockStdMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::RwLockStdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, + Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy), } } + pub(crate) async fn update_torrent_with_peer_and_get_stats( &self, info_hash: &InfoHash, peer: &peer::Peer, ) -> (bool, SwarmMetadata) { match self { - Repo::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::RwLockStdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::RwLockStdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), } } + pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { match self { - Repo::Std(repo) => repo.write().insert(*info_hash, torrent), - Repo::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()), - Repo::StdMutexTokio(repo) => { - let r = repo.write().insert(*info_hash, torrent.into()); - match r { - Some(t) => Some(t.lock().await.clone()), - None => None, - } + Repo::RwLockStd(repo) => { + repo.write().insert(*info_hash, torrent); } - Repo::Tokio(repo) => repo.write().await.insert(*info_hash, torrent), - Repo::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()), - Repo::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()), - } + Repo::RwLockStdMutexStd(repo) => { + repo.write().insert(*info_hash, torrent.into()); + } + Repo::RwLockStdMutexTokio(repo) => { + repo.write().insert(*info_hash, torrent.into()); + } + Repo::RwLockTokio(repo) => { + repo.write().await.insert(*info_hash, torrent); + } + Repo::RwLockTokioMutexStd(repo) => { + repo.write().await.insert(*info_hash, torrent.into()); + } + Repo::RwLockTokioMutexTokio(repo) => { + repo.write().await.insert(*info_hash, torrent.into()); + } + Repo::SkipMapMutexStd(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } + }; + self.get(info_hash).await } } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index 7ffe17dd7..ab9648584 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -8,7 +8,9 @@ use torrust_tracker_primitives::info_hash::InfoHash; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; -use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio}; +use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd; +use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio; +use torrust_tracker_torrent_repository::repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_torrent_repository::EntrySingle; use crate::common::repo::Repo; @@ -16,30 +18,37 @@ use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; #[fixture] fn standard() -> Repo { - Repo::Std(RwLockStd::default()) + Repo::RwLockStd(RwLockStd::default()) } + #[fixture] fn standard_mutex() -> Repo { - Repo::StdMutexStd(RwLockStd::default()) + Repo::RwLockStdMutexStd(RwLockStd::default()) } #[fixture] fn standard_tokio() -> Repo { - Repo::StdMutexTokio(RwLockStd::default()) + Repo::RwLockStdMutexTokio(RwLockStd::default()) } #[fixture] fn tokio_std() -> Repo { - Repo::Tokio(RwLockTokio::default()) + Repo::RwLockTokio(RwLockTokio::default()) } + #[fixture] fn tokio_mutex() -> Repo { - Repo::TokioMutexStd(RwLockTokio::default()) + Repo::RwLockTokioMutexStd(RwLockTokio::default()) } #[fixture] fn tokio_tokio() -> Repo { - Repo::TokioMutexTokio(RwLockTokio::default()) + Repo::RwLockTokioMutexTokio(RwLockTokio::default()) +} + +#[fixture] +fn skip_list_std() -> Repo { + Repo::SkipMapMutexStd(CrossbeamSkipList::default()) } type Entries = Vec<(InfoHash, EntrySingle)>; @@ -223,7 +232,16 @@ fn policy_remove_persist() -> TrackerPolicy { #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_a_torrent_entry( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { make(&repo, &entries).await; @@ -246,7 +264,16 @@ async fn it_should_get_a_torrent_entry( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, many_out_of_order: Entries, ) { @@ -279,7 +306,16 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_paginated( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, ) { @@ -327,7 +363,16 @@ async fn it_should_get_paginated( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_metrics( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; @@ -359,7 +404,16 @@ async fn it_should_get_metrics( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_import_persistent_torrents( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, #[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents, ) { @@ -388,7 +442,16 @@ async fn it_should_import_persistent_torrents( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_an_entry( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { make(&repo, &entries).await; @@ -415,7 +478,16 @@ async fn it_should_remove_an_entry( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_inactive_peers( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, ) { use std::ops::Sub as _; @@ -488,7 +560,16 @@ async fn it_should_remove_inactive_peers( #[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_peerless_torrents( - #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[values( + standard(), + standard_mutex(), + standard_tokio(), + tokio_std(), + tokio_mutex(), + tokio_tokio(), + skip_list_std() + )] + repo: Repo, #[case] entries: Entries, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index ab78de683..286a7e047 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -25,7 +25,6 @@ //! - The number of peers that have NOT completed downloading the torrent and are still active, that means they are actively participating in the network. //! Peer that don not have a full copy of the torrent data are called "leechers". //! +use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; -use torrust_tracker_torrent_repository::TorrentsRwLockStdMutexStd; - -pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used +pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used