@@ -439,14 +439,13 @@ pub mod services;
439439pub mod statistics;
440440pub mod torrent;
441441
442- use std:: collections:: { BTreeMap , HashMap } ;
442+ use std:: collections:: HashMap ;
443443use std:: net:: IpAddr ;
444444use std:: panic:: Location ;
445445use std:: sync:: Arc ;
446446use std:: time:: Duration ;
447447
448448use derive_more:: Constructor ;
449- use futures:: future:: join_all;
450449use log:: debug;
451450use tokio:: sync:: mpsc:: error:: SendError ;
452451use torrust_tracker_configuration:: { AnnouncePolicy , Configuration } ;
@@ -455,8 +454,8 @@ use torrust_tracker_primitives::TrackerMode;
455454use self :: auth:: Key ;
456455use self :: error:: Error ;
457456use self :: peer:: Peer ;
458- use self :: torrent:: repository_asyn:: { RepositoryAsync , RepositoryTokioRwLock } ;
459- use self :: torrent:: { Entry , UpdateTorrentAsync } ;
457+ use self :: torrent:: repository_asyn:: RepositoryTokioRwLock ;
458+ use self :: torrent:: { Entry , Repository , UpdateTorrentAsync } ;
460459use crate :: core:: databases:: Database ;
461460use crate :: core:: torrent:: { SwarmMetadata , SwarmStats } ;
462461use crate :: shared:: bit_torrent:: info_hash:: InfoHash ;
@@ -685,9 +684,7 @@ impl Tracker {
685684
686685 /// It returns the data for a `scrape` response.
687686 async fn get_swarm_metadata ( & self , info_hash : & InfoHash ) -> SwarmMetadata {
688- let torrents = self . torrents . get_torrents ( ) . await ;
689-
690- match torrents. get ( info_hash) {
687+ match self . torrents . get ( info_hash) . await {
691688 Some ( torrent_entry) => torrent_entry. get_swarm_metadata ( ) ,
692689 None => SwarmMetadata :: default ( ) ,
693690 }
@@ -704,29 +701,13 @@ impl Tracker {
704701 pub async fn load_torrents_from_database ( & self ) -> Result < ( ) , databases:: error:: Error > {
705702 let persistent_torrents = self . database . load_persistent_torrents ( ) . await ?;
706703
707- let mut torrents = self . torrents . get_torrents_mut ( ) . await ;
708-
709- for ( info_hash, completed) in persistent_torrents {
710- // Skip if torrent entry already exists
711- if torrents. contains_key ( & info_hash) {
712- continue ;
713- }
714-
715- let torrent_entry = torrent:: Entry {
716- peers : BTreeMap :: default ( ) ,
717- completed,
718- } ;
719-
720- torrents. insert ( info_hash, torrent_entry) ;
721- }
704+ self . torrents . import_persistent ( & persistent_torrents) . await ;
722705
723706 Ok ( ( ) )
724707 }
725708
726709 async fn get_torrent_peers_for_peer ( & self , info_hash : & InfoHash , peer : & Peer ) -> Vec < peer:: Peer > {
727- let read_lock = self . torrents . get_torrents ( ) . await ;
728-
729- match read_lock. get ( info_hash) {
710+ match self . torrents . get ( info_hash) . await {
730711 None => vec ! [ ] ,
731712 Some ( entry) => entry
732713 . get_peers_for_peer ( peer, TORRENT_PEERS_LIMIT )
@@ -740,9 +721,7 @@ impl Tracker {
740721 ///
741722 /// Get all torrent peers for a given torrent
742723 pub async fn get_torrent_peers ( & self , info_hash : & InfoHash ) -> Vec < peer:: Peer > {
743- let read_lock = self . torrents . get_torrents ( ) . await ;
744-
745- match read_lock. get ( info_hash) {
724+ match self . torrents . get ( info_hash) . await {
746725 None => vec ! [ ] ,
747726 Some ( entry) => entry. get_peers ( TORRENT_PEERS_LIMIT ) . into_iter ( ) . copied ( ) . collect ( ) ,
748727 }
@@ -757,7 +736,7 @@ impl Tracker {
757736 // code-review: consider splitting the function in two (command and query segregation).
758737 // `update_torrent_with_peer` and `get_stats`
759738
760- let ( stats , stats_updated ) = self . torrents . update_torrent_with_peer_and_get_stats ( info_hash, peer) . await ;
739+ let ( stats_updated , stats ) = self . torrents . update_torrent_with_peer_and_get_stats ( info_hash, peer) . await ;
761740
762741 if self . policy . persistent_torrent_completed_stat && stats_updated {
763742 let completed = stats. downloaded ;
@@ -777,71 +756,18 @@ impl Tracker {
777756 /// # Panics
778757 /// Panics if unable to get the torrent metrics.
779758 pub async fn get_torrents_metrics ( & self ) -> TorrentsMetrics {
780- let arc_torrents_metrics = Arc :: new ( tokio:: sync:: Mutex :: new ( TorrentsMetrics {
781- seeders : 0 ,
782- completed : 0 ,
783- leechers : 0 ,
784- torrents : 0 ,
785- } ) ) ;
786-
787- let db = self . torrents . get_torrents ( ) . await . clone ( ) ;
788-
789- let futures = db
790- . values ( )
791- . map ( |torrent_entry| {
792- let torrent_entry = torrent_entry. clone ( ) ;
793- let torrents_metrics = arc_torrents_metrics. clone ( ) ;
794-
795- async move {
796- tokio:: spawn ( async move {
797- let ( seeders, completed, leechers) = torrent_entry. get_stats ( ) ;
798- torrents_metrics. lock ( ) . await . seeders += u64:: from ( seeders) ;
799- torrents_metrics. lock ( ) . await . completed += u64:: from ( completed) ;
800- torrents_metrics. lock ( ) . await . leechers += u64:: from ( leechers) ;
801- torrents_metrics. lock ( ) . await . torrents += 1 ;
802- } )
803- . await
804- . expect ( "Error torrent_metrics spawn" ) ;
805- }
806- } )
807- . collect :: < Vec < _ > > ( ) ;
808-
809- join_all ( futures) . await ;
810-
811- let torrents_metrics = Arc :: try_unwrap ( arc_torrents_metrics) . expect ( "Could not unwrap arc_torrents_metrics" ) ;
812-
813- torrents_metrics. into_inner ( )
759+ self . torrents . get_metrics ( ) . await
814760 }
815761
816762 /// Remove inactive peers and (optionally) peerless torrents
817763 ///
818764 /// # Context: Tracker
819765 pub async fn cleanup_torrents ( & self ) {
820- let mut torrents_lock = self . torrents . get_torrents_mut ( ) . await ;
821-
822766 // If we don't need to remove torrents we will use the faster iter
823767 if self . policy . remove_peerless_torrents {
824- let mut cleaned_torrents_map: BTreeMap < InfoHash , torrent:: Entry > = BTreeMap :: new ( ) ;
825-
826- for ( info_hash, torrent_entry) in & mut * torrents_lock {
827- torrent_entry. remove_inactive_peers ( self . policy . max_peer_timeout ) ;
828-
829- if torrent_entry. peers . is_empty ( ) {
830- continue ;
831- }
832-
833- if self . policy . persistent_torrent_completed_stat && torrent_entry. completed == 0 {
834- continue ;
835- }
836-
837- cleaned_torrents_map. insert ( * info_hash, torrent_entry. clone ( ) ) ;
838- }
839-
840- * torrents_lock = cleaned_torrents_map;
768+ self . torrents . remove_peerless_torrents ( & self . policy ) . await ;
841769 } else {
842- for torrent_entry in ( * torrents_lock) . values_mut ( ) {
843- torrent_entry. remove_inactive_peers ( self . policy . max_peer_timeout ) ;
844- }
770+ self . torrents . remove_inactive_peers ( self . policy . max_peer_timeout ) . await ;
845771 }
846772 }
847773
@@ -1755,7 +1681,7 @@ mod tests {
17551681 use aquatic_udp_protocol:: AnnounceEvent ;
17561682
17571683 use crate :: core:: tests:: the_tracker:: { sample_info_hash, sample_peer, tracker_persisting_torrents_in_database} ;
1758- use crate :: core:: torrent:: repository_asyn :: RepositoryAsync ;
1684+ use crate :: core:: torrent:: Repository ;
17591685
17601686 #[ tokio:: test]
17611687 async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database ( ) {
@@ -1774,14 +1700,15 @@ mod tests {
17741700 assert_eq ! ( swarm_stats. downloaded, 1 ) ;
17751701
17761702 // Remove the newly updated torrent from memory
1777- tracker. torrents . get_torrents_mut ( ) . await . remove ( & info_hash) ;
1703+ tracker. torrents . remove ( & info_hash) . await ;
17781704
17791705 tracker. load_torrents_from_database ( ) . await . unwrap ( ) ;
17801706
1781- let torrents = tracker. torrents . get_torrents ( ) . await ;
1782- assert ! ( torrents. contains_key( & info_hash) ) ;
1783-
1784- let torrent_entry = torrents. get ( & info_hash) . unwrap ( ) ;
1707+ let torrent_entry = tracker
1708+ . torrents
1709+ . get ( & info_hash)
1710+ . await
1711+ . expect ( "it should be able to get entry" ) ;
17851712
17861713 // It persists the number of completed peers.
17871714 assert_eq ! ( torrent_entry. completed, 1 ) ;
0 commit comments