Skip to content

Commit 9159187

Browse files
committed
First implementation and iteration of using crossbeam to eventually handle the torrent data, should improve and minimize the memory usage drastically, so to keep the memory object at a single place
1 parent ff4ca58 commit 9159187

File tree

3 files changed

+128
-0
lines changed

3 files changed

+128
-0
lines changed

Cargo.lock

Lines changed: 71 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,5 @@ aquatic_udp_protocol = { git = "https://github.com/greatest-ape/aquatic" }
3939
futures = "0.3.21"
4040
async-trait = "0.1.52"
4141
dhat = "0.3.0"
42+
crossbeam = "0.8.1"
43+
crossbeam-channel = "0.5.4"

src/main.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::net::SocketAddr;
22
use std::sync::Arc;
3+
use crossbeam_channel::bounded;
34
use log::{info};
45
use tokio::task::JoinHandle;
56
use torrust_tracker::{Configuration, http_api_server, HttpApiConfig, HttpTrackerConfig, logging, TorrentTracker, UdpServer, UdpTrackerConfig};
@@ -9,11 +10,28 @@ use torrust_tracker::torrust_http_tracker::server::HttpServer;
910
#[global_allocator]
1011
static ALLOC: dhat::Alloc = dhat::Alloc;
1112

13+
pub struct DataStream {
14+
action: u8,
15+
data: Vec<()>
16+
}
17+
1218
#[tokio::main]
1319
async fn main() {
1420
#[cfg(feature = "dhat-heap")]
1521
let _profiler = dhat::Profiler::new_heap();
1622

23+
// Loading configuration
24+
let config = match Configuration::load_from_file() {
25+
Ok(config) => config,
26+
Err(error) => {
27+
panic!("{}", error)
28+
}
29+
};
30+
31+
// Start the thread where data is being exchanged for usaga
32+
let (sender, receiver): (crossbeam_channel::Sender<DataStream>, crossbeam_channel::Receiver<DataStream>) = bounded(1);
33+
let _torrents_memory_handler = start_torrents_memory_handler(&config, sender.clone(), receiver.clone());
34+
1735
// torrust config
1836
let config = match Configuration::load_from_file() {
1937
Ok(config) => Arc::new(config),
@@ -97,10 +115,47 @@ async fn main() {
97115
let _ = tracker.periodic_saving().await;
98116
info!("Torrents saved");
99117
}
118+
119+
// Closing down channel
120+
sender.clone().send(DataStream{
121+
action: ACTION_CLOSE_CHANNEL,
122+
data: Vec::new()
123+
});
100124
}
101125
}
102126
}
103127

128+
129+
const ACTION_CLOSE_CHANNEL: u8 = 0;
130+
const ACTION_READ_TORRENTS: u8 = 1;
131+
const ACTION_WRITE_TORRENTS: u8 = 2;
132+
const ACTION_UPDATE_TORRENTS: u8 = 3;
133+
const ACTION_READ_PEERS: u8 = 4;
134+
const ACTION_WRITE_PEERS: u8 = 5;
135+
const ACTION_UPDATE_PEERS: u8 = 6;
136+
fn start_torrents_memory_handler(config: &Configuration, sender: crossbeam_channel::Sender<DataStream>, receiver: crossbeam_channel::Receiver<DataStream>) -> Option<JoinHandle<()>> {
137+
// This is our main memory handler, everything will be received, handled and send back.
138+
return Some(tokio::spawn(async move {
139+
loop {
140+
// Wait for incoming data.
141+
let data: DataStream = receiver.recv().unwrap();
142+
143+
// Lets check what action is given.
144+
match data.action {
145+
ACTION_CLOSE_CHANNEL => {
146+
info!("Ending the memory handler thread...");
147+
sender.send(DataStream{
148+
action: ACTION_CLOSE_CHANNEL,
149+
data: Vec::new()
150+
});
151+
break;
152+
}
153+
_ => {}
154+
}
155+
}
156+
}));
157+
}
158+
104159
fn start_torrent_periodic_job(config: Arc<Configuration>, tracker: Arc<TorrentTracker>) -> Option<JoinHandle<()>> {
105160
let weak_tracker = std::sync::Arc::downgrade(&tracker);
106161
let interval = config.persistence_interval.unwrap_or(900);

0 commit comments

Comments
 (0)