Skip to content

Commit 642d6be

Browse files
committed
feat: new torrent repository using crossbeam_skiplist::SkipMap
SkipMap is an ordered map based on a lock-free skip list. It's al alternative to BTreeMap which supports concurrent access across multiple threads. One of the performance problems with the current solution is we can only add one torrent at the time because threads need to lock the whole BTreeMap. The SkipMap should avoid that problem. More info about SkiMap: https://docs.rs/crossbeam-skiplist/latest/crossbeam_skiplist/struct.SkipMap.html#method.remove The aquatic UDP load test was executed with the current implementation and the new one: Current Implementation: Requests out: 397287.37/second Responses in: 357549.15/second - Connect responses: 177073.94 - Announce responses: 176905.36 - Scrape responses: 3569.85 - Error responses: 0.00 Peers per announce response: 0.00 Announce responses per info hash: - p10: 1 - p25: 1 - p50: 1 - p75: 1 - p90: 2 - p95: 3 - p99: 104 - p99.9: 287 - p100: 371 New Implementation: Requests out: 396788.68/second Responses in: 357105.27/second - Connect responses: 176662.91 - Announce responses: 176863.44 - Scrape responses: 3578.91 - Error responses: 0.00 Peers per announce response: 0.00 Announce responses per info hash: - p10: 1 - p25: 1 - p50: 1 - p75: 1 - p90: 2 - p95: 3 - p99: 105 - p99.9: 287 - p100: 351 The result is pretty similar but the benchmarking for the repository using criterios shows that this implementations is a litle bit better than the current one.
1 parent 608585e commit 642d6be

File tree

7 files changed

+137
-5
lines changed

7 files changed

+137
-5
lines changed

cSpell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@
135135
"Shareaza",
136136
"sharktorrent",
137137
"SHLVL",
138+
"skiplist",
138139
"socketaddr",
139140
"sqllite",
140141
"subsec",

packages/torrent-repository/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ rust-version.workspace = true
1616
version.workspace = true
1717

1818
[dependencies]
19+
crossbeam-skiplist = "0.1"
1920
futures = "0.3.29"
2021
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
21-
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }
22-
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
2322
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" }
23+
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
24+
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }
2425

2526
[dev-dependencies]
2627
criterion = { version = "0", features = ["async_tokio"] }

packages/torrent-repository/benches/repository_benchmark.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ mod helpers;
55
use criterion::{criterion_group, criterion_main, Criterion};
66
use torrust_tracker_torrent_repository::{
77
TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd,
8-
TorrentsRwLockTokioMutexTokio,
8+
TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
99
};
1010

1111
use crate::helpers::{asyn, sync};
@@ -45,6 +45,10 @@ fn add_one_torrent(c: &mut Criterion) {
4545
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokioMutexTokio, _>);
4646
});
4747

48+
group.bench_function("SkipMapMutexStd", |b| {
49+
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
50+
});
51+
4852
group.finish();
4953
}
5054

@@ -89,6 +93,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
8993
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
9094
});
9195

96+
group.bench_function("SkipMapMutexStd", |b| {
97+
b.to_async(&rt)
98+
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
99+
});
100+
92101
group.finish();
93102
}
94103

@@ -133,6 +142,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
133142
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
134143
});
135144

145+
group.bench_function("SkipMapMutexStd", |b| {
146+
b.to_async(&rt)
147+
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
148+
});
149+
136150
group.finish();
137151
}
138152

@@ -178,6 +192,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
178192
});
179193
});
180194

195+
group.bench_function("SkipMapMutexStd", |b| {
196+
b.to_async(&rt)
197+
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
198+
});
199+
181200
group.finish();
182201
}
183202

packages/torrent-repository/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use repository::skip_map_mutex_std::CrossbeamSkipList;
34
use torrust_tracker_clock::clock;
45

56
pub mod entry;
@@ -16,6 +17,8 @@ pub type TorrentsRwLockTokio = repository::RwLockTokio<EntrySingle>;
1617
pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio<EntryMutexStd>;
1718
pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio<EntryMutexTokio>;
1819

20+
pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;
21+
1922
/// This code needs to be copied into each crate.
2023
/// Working version, for production.
2124
#[cfg(not(test))]

packages/torrent-repository/src/repository/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod rw_lock_std_mutex_tokio;
1111
pub mod rw_lock_tokio;
1212
pub mod rw_lock_tokio_mutex_std;
1313
pub mod rw_lock_tokio_mutex_tokio;
14+
pub mod skip_map_mutex_std;
1415

1516
use std::fmt::Debug;
1617

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::collections::BTreeMap;
2+
use std::sync::Arc;
3+
4+
use crossbeam_skiplist::SkipMap;
5+
use torrust_tracker_configuration::TrackerPolicy;
6+
use torrust_tracker_primitives::info_hash::InfoHash;
7+
use torrust_tracker_primitives::pagination::Pagination;
8+
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
9+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
10+
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
11+
12+
use super::Repository;
13+
use crate::entry::{Entry, EntrySync};
14+
use crate::{EntryMutexStd, EntrySingle};
15+
16+
#[derive(Default, Debug)]
17+
pub struct CrossbeamSkipList<T> {
18+
torrents: SkipMap<InfoHash, T>,
19+
}
20+
21+
impl Repository<EntryMutexStd> for CrossbeamSkipList<EntryMutexStd>
22+
where
23+
EntryMutexStd: EntrySync,
24+
EntrySingle: Entry,
25+
{
26+
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
27+
let entry = self.torrents.get_or_insert(*info_hash, Arc::default());
28+
entry.value().insert_or_update_peer_and_get_stats(peer)
29+
}
30+
31+
fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
32+
let maybe_entry = self.torrents.get(key);
33+
maybe_entry.map(|entry| entry.value().clone())
34+
}
35+
36+
fn get_metrics(&self) -> TorrentsMetrics {
37+
let mut metrics = TorrentsMetrics::default();
38+
39+
for entry in &self.torrents {
40+
let stats = entry.value().lock().expect("it should get a lock").get_stats();
41+
metrics.complete += u64::from(stats.complete);
42+
metrics.downloaded += u64::from(stats.downloaded);
43+
metrics.incomplete += u64::from(stats.incomplete);
44+
metrics.torrents += 1;
45+
}
46+
47+
metrics
48+
}
49+
50+
fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
51+
match pagination {
52+
Some(pagination) => self
53+
.torrents
54+
.iter()
55+
.skip(pagination.offset as usize)
56+
.take(pagination.limit as usize)
57+
.map(|entry| (*entry.key(), entry.value().clone()))
58+
.collect(),
59+
None => self
60+
.torrents
61+
.iter()
62+
.map(|entry| (*entry.key(), entry.value().clone()))
63+
.collect(),
64+
}
65+
}
66+
67+
fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
68+
for (info_hash, completed) in persistent_torrents {
69+
if self.torrents.contains_key(info_hash) {
70+
continue;
71+
}
72+
73+
let entry = EntryMutexStd::new(
74+
EntrySingle {
75+
peers: BTreeMap::default(),
76+
downloaded: *completed,
77+
}
78+
.into(),
79+
);
80+
81+
// Since SkipMap is lock-free the torrent could have been inserted
82+
// after checking if it exists.
83+
self.torrents.get_or_insert(*info_hash, entry);
84+
}
85+
}
86+
87+
fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
88+
self.torrents.remove(key).map(|entry| entry.value().clone())
89+
}
90+
91+
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
92+
for entry in &self.torrents {
93+
entry.value().remove_inactive_peers(current_cutoff);
94+
}
95+
}
96+
97+
fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
98+
for entry in &self.torrents {
99+
if entry.value().is_good(policy) {
100+
continue;
101+
}
102+
103+
entry.remove();
104+
}
105+
}
106+
}

src/core/torrent/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
//! Peer that don not have a full copy of the torrent data are called "leechers".
2727
//!
2828
29-
use torrust_tracker_torrent_repository::TorrentsRwLockStdMutexStd;
29+
use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd;
3030

31-
pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used
31+
//pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used
32+
pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used

0 commit comments

Comments
 (0)