Skip to content

Commit aa4bfba

Browse files
committed
refactor: segregate command and query for announce request
This changes the API of the torrent repository. The method: ``` fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata); ``` is replaced with: ``` fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer); fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata>; ``` The performance is not affected. Benchmaring is still using both methods in order to simulate `announce` requests. 1. The interface is simpler (command/query segregation. 2. In the long-term: - Returning swarm metadata in the announce request could be optional. The announce request process would be faster if the tracker does not have to mantain the swarm data. This is not likely to happen becuase the scrape request needs this metadata. - New repository performance improvements could be implemented. This allow decoupling peer lists from swarm metadata. The repository internally can have two data strcutures one for the peer list and another for the swarm metatada. Both using different locks.
1 parent b5fb03b commit aa4bfba

File tree

25 files changed

+259
-226
lines changed

25 files changed

+259
-226
lines changed

packages/torrent-repository/benches/helpers/asyn.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ where
1818

1919
let info_hash = InfoHash([0; 20]);
2020

21-
torrent_repository
22-
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
23-
.await;
21+
torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await;
22+
23+
torrent_repository.get_swarm_metadata(&info_hash).await;
2424
}
2525

2626
start.elapsed()
@@ -37,19 +37,19 @@ where
3737
let handles = FuturesUnordered::new();
3838

3939
// Add the torrent/peer to the torrent repository
40-
torrent_repository
41-
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
42-
.await;
40+
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await;
41+
42+
torrent_repository.get_swarm_metadata(info_hash).await;
4343

4444
let start = Instant::now();
4545

4646
for _ in 0..samples {
4747
let torrent_repository_clone = torrent_repository.clone();
4848

4949
let handle = runtime.spawn(async move {
50-
torrent_repository_clone
51-
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
52-
.await;
50+
torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER).await;
51+
52+
torrent_repository_clone.get_swarm_metadata(info_hash).await;
5353

5454
if let Some(sleep_time) = sleep {
5555
let start_time = std::time::Instant::now();
@@ -87,9 +87,9 @@ where
8787
let torrent_repository_clone = torrent_repository.clone();
8888

8989
let handle = runtime.spawn(async move {
90-
torrent_repository_clone
91-
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
92-
.await;
90+
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await;
91+
92+
torrent_repository_clone.get_swarm_metadata(&info_hash).await;
9393

9494
if let Some(sleep_time) = sleep {
9595
let start_time = std::time::Instant::now();
@@ -123,9 +123,8 @@ where
123123

124124
// Add the torrents/peers to the torrent repository
125125
for info_hash in &info_hashes {
126-
torrent_repository
127-
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
128-
.await;
126+
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await;
127+
torrent_repository.get_swarm_metadata(info_hash).await;
129128
}
130129

131130
let start = Instant::now();
@@ -134,9 +133,8 @@ where
134133
let torrent_repository_clone = torrent_repository.clone();
135134

136135
let handle = runtime.spawn(async move {
137-
torrent_repository_clone
138-
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
139-
.await;
136+
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await;
137+
torrent_repository_clone.get_swarm_metadata(&info_hash).await;
140138

141139
if let Some(sleep_time) = sleep {
142140
let start_time = std::time::Instant::now();

packages/torrent-repository/benches/helpers/sync.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ where
2020

2121
let info_hash = InfoHash([0; 20]);
2222

23-
torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
23+
torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER);
24+
25+
torrent_repository.get_swarm_metadata(&info_hash);
2426
}
2527

2628
start.elapsed()
@@ -37,15 +39,19 @@ where
3739
let handles = FuturesUnordered::new();
3840

3941
// Add the torrent/peer to the torrent repository
40-
torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
42+
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER);
43+
44+
torrent_repository.get_swarm_metadata(info_hash);
4145

4246
let start = Instant::now();
4347

4448
for _ in 0..samples {
4549
let torrent_repository_clone = torrent_repository.clone();
4650

4751
let handle = runtime.spawn(async move {
48-
torrent_repository_clone.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
52+
torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER);
53+
54+
torrent_repository_clone.get_swarm_metadata(info_hash);
4955

5056
if let Some(sleep_time) = sleep {
5157
let start_time = std::time::Instant::now();
@@ -83,7 +89,9 @@ where
8389
let torrent_repository_clone = torrent_repository.clone();
8490

8591
let handle = runtime.spawn(async move {
86-
torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
92+
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER);
93+
94+
torrent_repository_clone.get_swarm_metadata(&info_hash);
8795

8896
if let Some(sleep_time) = sleep {
8997
let start_time = std::time::Instant::now();
@@ -117,7 +125,8 @@ where
117125

118126
// Add the torrents/peers to the torrent repository
119127
for info_hash in &info_hashes {
120-
torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
128+
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER);
129+
torrent_repository.get_swarm_metadata(info_hash);
121130
}
122131

123132
let start = Instant::now();
@@ -126,7 +135,8 @@ where
126135
let torrent_repository_clone = torrent_repository.clone();
127136

128137
let handle = runtime.spawn(async move {
129-
torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
138+
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER);
139+
torrent_repository_clone.get_swarm_metadata(&info_hash);
130140

131141
if let Some(sleep_time) = sleep {
132142
let start_time = std::time::Instant::now();

packages/torrent-repository/src/entry/mod.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub trait Entry {
1515
/// It returns the swarm metadata (statistics) as a struct:
1616
///
1717
/// `(seeders, completed, leechers)`
18-
fn get_stats(&self) -> SwarmMetadata;
18+
fn get_swarm_metadata(&self) -> SwarmMetadata;
1919

2020
/// Returns True if Still a Valid Entry according to the Tracker Policy
2121
fn is_good(&self, policy: &TrackerPolicy) -> bool;
@@ -40,31 +40,27 @@ pub trait Entry {
4040
///
4141
/// The number of peers that have complete downloading is synchronously updated when peers are updated.
4242
/// That's the total torrent downloads counter.
43-
fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool;
44-
45-
// It preforms a combined operation of `insert_or_update_peer` and `get_stats`.
46-
fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata);
43+
fn upsert_peer(&mut self, peer: &peer::Peer) -> bool;
4744

4845
/// It removes peer from the swarm that have not been updated for more than `current_cutoff` seconds
4946
fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch);
5047
}
5148

5249
#[allow(clippy::module_name_repetitions)]
5350
pub trait EntrySync {
54-
fn get_stats(&self) -> SwarmMetadata;
51+
fn get_swarm_metadata(&self) -> SwarmMetadata;
5552
fn is_good(&self, policy: &TrackerPolicy) -> bool;
5653
fn peers_is_empty(&self) -> bool;
5754
fn get_peers_len(&self) -> usize;
5855
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
5956
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
60-
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool;
61-
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata);
57+
fn upsert_peer(&self, peer: &peer::Peer) -> bool;
6258
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
6359
}
6460

6561
#[allow(clippy::module_name_repetitions)]
6662
pub trait EntryAsync {
67-
fn get_stats(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
63+
fn get_swarm_metadata(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
6864
fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
6965
fn peers_is_empty(&self) -> impl std::future::Future<Output = bool> + Send;
7066
fn get_peers_len(&self) -> impl std::future::Future<Output = usize> + Send;
@@ -74,11 +70,7 @@ pub trait EntryAsync {
7470
client: &SocketAddr,
7571
limit: Option<usize>,
7672
) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
77-
fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
78-
fn insert_or_update_peer_and_get_stats(
79-
self,
80-
peer: &peer::Peer,
81-
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + std::marker::Send;
73+
fn upsert_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
8274
fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future<Output = ()> + Send;
8375
}
8476

packages/torrent-repository/src/entry/mutex_std.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use super::{Entry, EntrySync};
99
use crate::{EntryMutexStd, EntrySingle};
1010

1111
impl EntrySync for EntryMutexStd {
12-
fn get_stats(&self) -> SwarmMetadata {
13-
self.lock().expect("it should get a lock").get_stats()
12+
fn get_swarm_metadata(&self) -> SwarmMetadata {
13+
self.lock().expect("it should get a lock").get_swarm_metadata()
1414
}
1515

1616
fn is_good(&self, policy: &TrackerPolicy) -> bool {
@@ -33,14 +33,8 @@ impl EntrySync for EntryMutexStd {
3333
self.lock().expect("it should get lock").get_peers_for_client(client, limit)
3434
}
3535

36-
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool {
37-
self.lock().expect("it should lock the entry").insert_or_update_peer(peer)
38-
}
39-
40-
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
41-
self.lock()
42-
.expect("it should lock the entry")
43-
.insert_or_update_peer_and_get_stats(peer)
36+
fn upsert_peer(&self, peer: &peer::Peer) -> bool {
37+
self.lock().expect("it should lock the entry").upsert_peer(peer)
4438
}
4539

4640
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {

packages/torrent-repository/src/entry/mutex_tokio.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use super::{Entry, EntryAsync};
99
use crate::{EntryMutexTokio, EntrySingle};
1010

1111
impl EntryAsync for EntryMutexTokio {
12-
async fn get_stats(&self) -> SwarmMetadata {
13-
self.lock().await.get_stats()
12+
async fn get_swarm_metadata(&self) -> SwarmMetadata {
13+
self.lock().await.get_swarm_metadata()
1414
}
1515

1616
async fn check_good(self, policy: &TrackerPolicy) -> bool {
@@ -33,12 +33,8 @@ impl EntryAsync for EntryMutexTokio {
3333
self.lock().await.get_peers_for_client(client, limit)
3434
}
3535

36-
async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool {
37-
self.lock().await.insert_or_update_peer(peer)
38-
}
39-
40-
async fn insert_or_update_peer_and_get_stats(self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
41-
self.lock().await.insert_or_update_peer_and_get_stats(peer)
36+
async fn upsert_peer(self, peer: &peer::Peer) -> bool {
37+
self.lock().await.upsert_peer(peer)
4238
}
4339

4440
async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) {

packages/torrent-repository/src/entry/single.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::EntrySingle;
1212

1313
impl Entry for EntrySingle {
1414
#[allow(clippy::cast_possible_truncation)]
15-
fn get_stats(&self) -> SwarmMetadata {
15+
fn get_swarm_metadata(&self) -> SwarmMetadata {
1616
let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32;
1717
let incomplete: u32 = self.peers.len() as u32 - complete;
1818

@@ -70,7 +70,7 @@ impl Entry for EntrySingle {
7070
}
7171
}
7272

73-
fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool {
73+
fn upsert_peer(&mut self, peer: &peer::Peer) -> bool {
7474
let mut downloaded_stats_updated: bool = false;
7575

7676
match peer::ReadInfo::get_event(peer) {
@@ -93,12 +93,6 @@ impl Entry for EntrySingle {
9393
downloaded_stats_updated
9494
}
9595

96-
fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
97-
let changed = self.insert_or_update_peer(peer);
98-
let stats = self.get_stats();
99-
(changed, stats)
100-
}
101-
10296
fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
10397
self.peers
10498
.retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff);

packages/torrent-repository/src/repository/dash_map_mutex_std.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,21 @@ where
2323
EntryMutexStd: EntrySync,
2424
EntrySingle: Entry,
2525
{
26-
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
26+
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
2727
if let Some(entry) = self.torrents.get(info_hash) {
28-
entry.insert_or_update_peer_and_get_stats(peer)
28+
entry.upsert_peer(peer);
2929
} else {
3030
let _unused = self.torrents.insert(*info_hash, Arc::default());
31-
32-
match self.torrents.get(info_hash) {
33-
Some(entry) => entry.insert_or_update_peer_and_get_stats(peer),
34-
None => (false, SwarmMetadata::zeroed()),
31+
if let Some(entry) = self.torrents.get(info_hash) {
32+
entry.upsert_peer(peer);
3533
}
3634
}
3735
}
3836

37+
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
38+
self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata())
39+
}
40+
3941
fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
4042
let maybe_entry = self.torrents.get(key);
4143
maybe_entry.map(|entry| entry.clone())
@@ -45,7 +47,7 @@ where
4547
let mut metrics = TorrentsMetrics::default();
4648

4749
for entry in &self.torrents {
48-
let stats = entry.value().lock().expect("it should get a lock").get_stats();
50+
let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata();
4951
metrics.complete += u64::from(stats.complete);
5052
metrics.downloaded += u64::from(stats.downloaded);
5153
metrics.incomplete += u64::from(stats.incomplete);

packages/torrent-repository/src/repository/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ pub trait Repository<T>: Debug + Default + Sized + 'static {
2424
fn remove(&self, key: &InfoHash) -> Option<T>;
2525
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
2626
fn remove_peerless_torrents(&self, policy: &TrackerPolicy);
27-
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata);
27+
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer);
28+
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata>;
2829
}
2930

3031
#[allow(clippy::module_name_repetitions)]
@@ -36,9 +37,6 @@ pub trait RepositoryAsync<T>: Debug + Default + Sized + 'static {
3637
fn remove(&self, key: &InfoHash) -> impl std::future::Future<Output = Option<T>> + Send;
3738
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future<Output = ()> + Send;
3839
fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future<Output = ()> + Send;
39-
fn update_torrent_with_peer_and_get_stats(
40-
&self,
41-
info_hash: &InfoHash,
42-
peer: &peer::Peer,
43-
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
40+
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> impl std::future::Future<Output = ()> + Send;
41+
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> impl std::future::Future<Output = Option<SwarmMetadata>> + Send;
4442
}

packages/torrent-repository/src/repository/rw_lock_std.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,16 @@ impl Repository<EntrySingle> for TorrentsRwLockStd
4747
where
4848
EntrySingle: Entry,
4949
{
50-
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
50+
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
5151
let mut db = self.get_torrents_mut();
5252

5353
let entry = db.entry(*info_hash).or_insert(EntrySingle::default());
5454

55-
entry.insert_or_update_peer_and_get_stats(peer)
55+
entry.upsert_peer(peer);
56+
}
57+
58+
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
59+
self.get(info_hash).map(|entry| entry.get_swarm_metadata())
5660
}
5761

5862
fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
@@ -64,7 +68,7 @@ where
6468
let mut metrics = TorrentsMetrics::default();
6569

6670
for entry in self.get_torrents().values() {
67-
let stats = entry.get_stats();
71+
let stats = entry.get_swarm_metadata();
6872
metrics.complete += u64::from(stats.complete);
6973
metrics.downloaded += u64::from(stats.downloaded);
7074
metrics.incomplete += u64::from(stats.incomplete);

0 commit comments

Comments
 (0)