diff --git a/Cargo.lock b/Cargo.lock index fa1d724e2..fc45cfc57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1311,6 +1311,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-literal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" + [[package]] name = "http" version = "0.2.11" @@ -3439,6 +3445,7 @@ dependencies = [ "derive_more", "fern", "futures", + "hex-literal", "hyper 1.1.0", "lazy_static", "local-ip-address", diff --git a/Cargo.toml b/Cargo.toml index 1418f23dd..bd04e1cc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ url = "2.5.0" tempfile = "3.9.0" clap = { version = "4.4.18", features = ["derive", "env"]} anyhow = "1.0.79" +hex-literal = "0.4.1" [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } diff --git a/src/console/clients/checker/app.rs b/src/console/clients/checker/app.rs index bca4b64dc..82ea800d0 100644 --- a/src/console/clients/checker/app.rs +++ b/src/console/clients/checker/app.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; use clap::Parser; +use log::{debug, LevelFilter}; use super::config::Configuration; use super::console::Console; @@ -39,6 +40,8 @@ struct Args { /// /// Will return an error if the configuration was not provided. pub async fn run() -> Result> { + setup_logging(LevelFilter::Info); + let args = Args::parse(); let config = setup_config(args)?; @@ -53,6 +56,27 @@ pub async fn run() -> Result> { Ok(service.run_checks().await) } +fn setup_logging(level: LevelFilter) { + if let Err(_err) = fern::Dispatch::new() + .format(|out, message, record| { + out.finish(format_args!( + "{} [{}][{}] {}", + chrono::Local::now().format("%+"), + record.target(), + record.level(), + message + )); + }) + .level(level) + .chain(std::io::stdout()) + .apply() + { + panic!("Failed to initialize logging.") + } + + debug!("logging initialized."); +} + fn setup_config(args: Args) -> Result { match (args.config_path, args.config_content) { (Some(config_path), _) => load_config_from_file(&config_path), diff --git a/src/console/clients/checker/checks/health.rs b/src/console/clients/checker/checks/health.rs new file mode 100644 index 000000000..9c28da514 --- /dev/null +++ b/src/console/clients/checker/checks/health.rs @@ -0,0 +1,51 @@ +use std::time::Duration; + +use colored::Colorize; +use reqwest::{Client as HttpClient, Url, Url as ServiceUrl}; + +use crate::console::clients::checker::console::Console; +use crate::console::clients::checker::printer::Printer; +use crate::console::clients::checker::service::{CheckError, CheckResult}; + +pub async fn run(health_checks: &Vec, console: &Console, check_results: &mut Vec) { + console.println("Health checks ..."); + + for health_check_url in health_checks { + match run_health_check(health_check_url.clone(), console).await { + Ok(()) => check_results.push(Ok(())), + Err(err) => check_results.push(Err(err)), + } + } +} + +async fn run_health_check(url: Url, console: &Console) -> Result<(), CheckError> { + let client = HttpClient::builder().timeout(Duration::from_secs(5)).build().unwrap(); + + let colored_url = url.to_string().yellow(); + + match client.get(url.clone()).send().await { + Ok(response) => { + if response.status().is_success() { + console.println(&format!("{} - Health API at {} is OK", "✓".green(), colored_url)); + Ok(()) + } else { + console.eprintln(&format!( + "{} - Health API at {} is failing: {:?}", + "✗".red(), + colored_url, + response + )); + Err(CheckError::HealthCheckError { url }) + } + } + Err(err) => { + console.eprintln(&format!( + "{} - Health API at {} is failing: {:?}", + "✗".red(), + colored_url, + err + )); + Err(CheckError::HealthCheckError { url }) + } + } +} diff --git a/src/console/clients/checker/checks/http.rs b/src/console/clients/checker/checks/http.rs new file mode 100644 index 000000000..df1e9bc9a --- /dev/null +++ b/src/console/clients/checker/checks/http.rs @@ -0,0 +1,95 @@ +use std::str::FromStr; + +use colored::Colorize; +use log::debug; +use reqwest::Url as ServiceUrl; +use url::Url; + +use crate::console::clients::checker::console::Console; +use crate::console::clients::checker::printer::Printer; +use crate::console::clients::checker::service::{CheckError, CheckResult}; +use crate::shared::bit_torrent::info_hash::InfoHash; +use crate::shared::bit_torrent::tracker::http::client::requests::announce::QueryBuilder; +use crate::shared::bit_torrent::tracker::http::client::responses::announce::Announce; +use crate::shared::bit_torrent::tracker::http::client::responses::scrape; +use crate::shared::bit_torrent::tracker::http::client::{requests, Client}; + +pub async fn run(http_trackers: &Vec, console: &Console, check_results: &mut Vec) { + console.println("HTTP trackers ..."); + + for http_tracker in http_trackers { + let colored_tracker_url = http_tracker.to_string().yellow(); + + match check_http_announce(http_tracker).await { + Ok(()) => { + check_results.push(Ok(())); + console.println(&format!("{} - Announce at {} is OK", "✓".green(), colored_tracker_url)); + } + Err(err) => { + check_results.push(Err(err)); + console.println(&format!("{} - Announce at {} is failing", "✗".red(), colored_tracker_url)); + } + } + + match check_http_scrape(http_tracker).await { + Ok(()) => { + check_results.push(Ok(())); + console.println(&format!("{} - Scrape at {} is OK", "✓".green(), colored_tracker_url)); + } + Err(err) => { + check_results.push(Err(err)); + console.println(&format!("{} - Scrape at {} is failing", "✗".red(), colored_tracker_url)); + } + } + } +} + +async fn check_http_announce(tracker_url: &Url) -> Result<(), CheckError> { + let info_hash_str = "9c38422213e30bff212b30c360d26f9a02136422".to_string(); // # DevSkim: ignore DS173237 + let info_hash = InfoHash::from_str(&info_hash_str).expect("a valid info-hash is required"); + + // todo: HTTP request could panic.For example, if the server is not accessible. + // We should change the client to catch that error and return a `CheckError`. + // Otherwise the checking process will stop. The idea is to process all checks + // and return a final report. + let response = Client::new(tracker_url.clone()) + .announce(&QueryBuilder::with_default_values().with_info_hash(&info_hash).query()) + .await; + + if let Ok(body) = response.bytes().await { + if let Ok(_announce_response) = serde_bencode::from_bytes::(&body) { + Ok(()) + } else { + debug!("announce body {:#?}", body); + Err(CheckError::HttpError { + url: tracker_url.clone(), + }) + } + } else { + Err(CheckError::HttpError { + url: tracker_url.clone(), + }) + } +} + +async fn check_http_scrape(url: &Url) -> Result<(), CheckError> { + let info_hashes: Vec = vec!["9c38422213e30bff212b30c360d26f9a02136422".to_string()]; // # DevSkim: ignore DS173237 + let query = requests::scrape::Query::try_from(info_hashes).expect("a valid array of info-hashes is required"); + + // todo: HTTP request could panic.For example, if the server is not accessible. + // We should change the client to catch that error and return a `CheckError`. + // Otherwise the checking process will stop. The idea is to process all checks + // and return a final report. + let response = Client::new(url.clone()).scrape(&query).await; + + if let Ok(body) = response.bytes().await { + if let Ok(_scrape_response) = scrape::Response::try_from_bencoded(&body) { + Ok(()) + } else { + debug!("scrape body {:#?}", body); + Err(CheckError::HttpError { url: url.clone() }) + } + } else { + Err(CheckError::HttpError { url: url.clone() }) + } +} diff --git a/src/console/clients/checker/checks/mod.rs b/src/console/clients/checker/checks/mod.rs new file mode 100644 index 000000000..16256595e --- /dev/null +++ b/src/console/clients/checker/checks/mod.rs @@ -0,0 +1,3 @@ +pub mod health; +pub mod http; +pub mod udp; diff --git a/src/console/clients/checker/checks/udp.rs b/src/console/clients/checker/checks/udp.rs new file mode 100644 index 000000000..890375b75 --- /dev/null +++ b/src/console/clients/checker/checks/udp.rs @@ -0,0 +1,87 @@ +use std::net::SocketAddr; + +use aquatic_udp_protocol::{Port, TransactionId}; +use colored::Colorize; +use hex_literal::hex; +use log::debug; + +use crate::console::clients::checker::console::Console; +use crate::console::clients::checker::printer::Printer; +use crate::console::clients::checker::service::{CheckError, CheckResult}; +use crate::console::clients::udp::checker; +use crate::shared::bit_torrent::info_hash::InfoHash; + +const ASSIGNED_BY_OS: u16 = 0; +const RANDOM_TRANSACTION_ID: i32 = -888_840_697; + +pub async fn run(udp_trackers: &Vec, console: &Console, check_results: &mut Vec) { + console.println("UDP trackers ..."); + + for udp_tracker in udp_trackers { + debug!("UDP tracker: {:?}", udp_tracker); + + let colored_tracker_url = udp_tracker.to_string().yellow(); + + let transaction_id = TransactionId(RANDOM_TRANSACTION_ID); + + let mut client = checker::Client::default(); + + debug!("Bind and connect"); + + let Ok(bound_to) = client.bind_and_connect(ASSIGNED_BY_OS, udp_tracker).await else { + check_results.push(Err(CheckError::UdpError { + socket_addr: *udp_tracker, + })); + console.println(&format!("{} - Can't connect to socket {}", "✗".red(), colored_tracker_url)); + break; + }; + + debug!("Send connection request"); + + let Ok(connection_id) = client.send_connection_request(transaction_id).await else { + check_results.push(Err(CheckError::UdpError { + socket_addr: *udp_tracker, + })); + console.println(&format!( + "{} - Can't make tracker connection request to {}", + "✗".red(), + colored_tracker_url + )); + break; + }; + + let info_hash = InfoHash(hex!("9c38422213e30bff212b30c360d26f9a02136422")); // # DevSkim: ignore DS173237 + + debug!("Send announce request"); + + if (client + .send_announce_request(connection_id, transaction_id, info_hash, Port(bound_to.port())) + .await) + .is_ok() + { + check_results.push(Ok(())); + console.println(&format!("{} - Announce at {} is OK", "✓".green(), colored_tracker_url)); + } else { + let err = CheckError::UdpError { + socket_addr: *udp_tracker, + }; + check_results.push(Err(err)); + console.println(&format!("{} - Announce at {} is failing", "✗".red(), colored_tracker_url)); + } + + debug!("Send scrape request"); + + let info_hashes = vec![InfoHash(hex!("9c38422213e30bff212b30c360d26f9a02136422"))]; // # DevSkim: ignore DS173237 + + if (client.send_scrape_request(connection_id, transaction_id, info_hashes).await).is_ok() { + check_results.push(Ok(())); + console.println(&format!("{} - Announce at {} is OK", "✓".green(), colored_tracker_url)); + } else { + let err = CheckError::UdpError { + socket_addr: *udp_tracker, + }; + check_results.push(Err(err)); + console.println(&format!("{} - Announce at {} is failing", "✗".red(), colored_tracker_url)); + } + } +} diff --git a/src/console/clients/checker/mod.rs b/src/console/clients/checker/mod.rs index 6a55141d5..d26a4a686 100644 --- a/src/console/clients/checker/mod.rs +++ b/src/console/clients/checker/mod.rs @@ -1,4 +1,5 @@ pub mod app; +pub mod checks; pub mod config; pub mod console; pub mod logger; diff --git a/src/console/clients/checker/service.rs b/src/console/clients/checker/service.rs index 1cb4725e0..94eff4a88 100644 --- a/src/console/clients/checker/service.rs +++ b/src/console/clients/checker/service.rs @@ -1,19 +1,12 @@ use std::net::SocketAddr; -use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; -use colored::Colorize; -use reqwest::{Client as HttpClient, Url}; +use reqwest::Url; +use super::checks; use super::config::Configuration; use super::console::Console; use crate::console::clients::checker::printer::Printer; -use crate::shared::bit_torrent::info_hash::InfoHash; -use crate::shared::bit_torrent::tracker::http::client::requests::announce::QueryBuilder; -use crate::shared::bit_torrent::tracker::http::client::responses::announce::Announce; -use crate::shared::bit_torrent::tracker::http::client::responses::scrape; -use crate::shared::bit_torrent::tracker::http::client::{requests, Client}; pub struct Service { pub(crate) config: Arc, @@ -24,7 +17,7 @@ pub type CheckResult = Result<(), CheckError>; #[derive(Debug)] pub enum CheckError { - UdpError, + UdpError { socket_addr: SocketAddr }, HttpError { url: Url }, HealthCheckError { url: Url }, } @@ -38,149 +31,12 @@ impl Service { let mut check_results = vec![]; - self.check_udp_trackers(); + checks::udp::run(&self.config.udp_trackers, &self.console, &mut check_results).await; - self.check_http_trackers(&mut check_results).await; + checks::http::run(&self.config.http_trackers, &self.console, &mut check_results).await; - self.run_health_checks(&mut check_results).await; + checks::health::run(&self.config.health_checks, &self.console, &mut check_results).await; check_results } - - fn check_udp_trackers(&self) { - self.console.println("UDP trackers ..."); - - for udp_tracker in &self.config.udp_trackers { - self.check_udp_tracker(udp_tracker); - } - } - - async fn check_http_trackers(&self, check_results: &mut Vec) { - self.console.println("HTTP trackers ..."); - - for http_tracker in &self.config.http_trackers { - let colored_tracker_url = http_tracker.to_string().yellow(); - - match self.check_http_announce(http_tracker).await { - Ok(()) => { - check_results.push(Ok(())); - self.console - .println(&format!("{} - Announce at {} is OK", "✓".green(), colored_tracker_url)); - } - Err(err) => { - check_results.push(Err(err)); - self.console - .println(&format!("{} - Announce at {} is failing", "✗".red(), colored_tracker_url)); - } - } - - match self.check_http_scrape(http_tracker).await { - Ok(()) => { - check_results.push(Ok(())); - self.console - .println(&format!("{} - Scrape at {} is OK", "✓".green(), colored_tracker_url)); - } - Err(err) => { - check_results.push(Err(err)); - self.console - .println(&format!("{} - Scrape at {} is failing", "✗".red(), colored_tracker_url)); - } - } - } - } - - async fn run_health_checks(&self, check_results: &mut Vec) { - self.console.println("Health checks ..."); - - for health_check_url in &self.config.health_checks { - match self.run_health_check(health_check_url.clone()).await { - Ok(()) => check_results.push(Ok(())), - Err(err) => check_results.push(Err(err)), - } - } - } - - fn check_udp_tracker(&self, address: &SocketAddr) { - // todo: - // - Make announce request - // - Make scrape request - - let colored_address = address.to_string().yellow(); - - self.console.println(&format!( - "{} - UDP tracker at udp://{} is OK ({})", - "✓".green(), - colored_address, - "TODO".red(), - )); - } - - async fn check_http_announce(&self, url: &Url) -> Result<(), CheckError> { - let info_hash_str = "9c38422213e30bff212b30c360d26f9a02136422".to_string(); // # DevSkim: ignore DS173237 - let info_hash = InfoHash::from_str(&info_hash_str).expect("a valid info-hash is required"); - - let response = Client::new(url.clone()) - .announce(&QueryBuilder::with_default_values().with_info_hash(&info_hash).query()) - .await; - - if let Ok(body) = response.bytes().await { - if let Ok(_announce_response) = serde_bencode::from_bytes::(&body) { - Ok(()) - } else { - Err(CheckError::HttpError { url: url.clone() }) - } - } else { - Err(CheckError::HttpError { url: url.clone() }) - } - } - - async fn check_http_scrape(&self, url: &Url) -> Result<(), CheckError> { - let info_hashes: Vec = vec!["9c38422213e30bff212b30c360d26f9a02136422".to_string()]; // # DevSkim: ignore DS173237 - let query = requests::scrape::Query::try_from(info_hashes).expect("a valid array of info-hashes is required"); - - let response = Client::new(url.clone()).scrape(&query).await; - - if let Ok(body) = response.bytes().await { - if let Ok(_scrape_response) = scrape::Response::try_from_bencoded(&body) { - Ok(()) - } else { - Err(CheckError::HttpError { url: url.clone() }) - } - } else { - Err(CheckError::HttpError { url: url.clone() }) - } - } - - async fn run_health_check(&self, url: Url) -> Result<(), CheckError> { - let client = HttpClient::builder().timeout(Duration::from_secs(5)).build().unwrap(); - - let colored_url = url.to_string().yellow(); - - match client.get(url.clone()).send().await { - Ok(response) => { - if response.status().is_success() { - self.console - .println(&format!("{} - Health API at {} is OK", "✓".green(), colored_url)); - Ok(()) - } else { - self.console.eprintln(&format!( - "{} - Health API at {} is failing: {:?}", - "✗".red(), - colored_url, - response - )); - Err(CheckError::HealthCheckError { url }) - } - } - Err(err) => { - self.console.eprintln(&format!( - "{} - Health API at {} is failing: {:?}", - "✗".red(), - colored_url, - err - )); - Err(CheckError::HealthCheckError { url }) - } - } - } } diff --git a/src/console/clients/udp/app.rs b/src/console/clients/udp/app.rs index e9c8b5274..b9e31155d 100644 --- a/src/console/clients/udp/app.rs +++ b/src/console/clients/udp/app.rs @@ -56,25 +56,21 @@ //! ``` //! //! The protocol (`udp://`) in the URL is mandatory. The path (`\scrape`) is optional. It always uses `\scrape`. -use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs}; +use std::net::{SocketAddr, ToSocketAddrs}; use std::str::FromStr; use anyhow::Context; -use aquatic_udp_protocol::common::InfoHash; -use aquatic_udp_protocol::Response::{AnnounceIpv4, AnnounceIpv6, Scrape}; -use aquatic_udp_protocol::{ - AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response, - ScrapeRequest, TransactionId, -}; +use aquatic_udp_protocol::Response::{self, AnnounceIpv4, AnnounceIpv6, Scrape}; +use aquatic_udp_protocol::{Port, TransactionId}; use clap::{Parser, Subcommand}; use log::{debug, LevelFilter}; -use serde_json::json; use url::Url; +use crate::console::clients::udp::checker; +use crate::console::clients::udp::responses::{AnnounceResponseDto, ScrapeResponseDto}; use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash; -use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient}; -const ASSIGNED_BY_OS: i32 = 0; +const ASSIGNED_BY_OS: u16 = 0; const RANDOM_TRANSACTION_ID: i32 = -888_840_697; #[derive(Parser, Debug)] @@ -110,83 +106,18 @@ pub async fn run() -> anyhow::Result<()> { let args = Args::parse(); - // Configuration - let local_port = ASSIGNED_BY_OS; - let local_bind_to = format!("0.0.0.0:{local_port}"); - let transaction_id = RANDOM_TRANSACTION_ID; - - // Bind to local port - debug!("Binding to: {local_bind_to}"); - let udp_client = UdpClient::bind(&local_bind_to).await; - let bound_to = udp_client.socket.local_addr().context("binding local address")?; - debug!("Bound to: {bound_to}"); - - let transaction_id = TransactionId(transaction_id); - let response = match args.command { Command::Announce { tracker_socket_addr, info_hash, - } => { - let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await; - - send_announce_request( - connection_id, - transaction_id, - info_hash, - Port(bound_to.port()), - &udp_tracker_client, - ) - .await - } + } => handle_announce(&tracker_socket_addr, &info_hash).await?, Command::Scrape { tracker_socket_addr, info_hashes, - } => { - let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await; - send_scrape_request(connection_id, transaction_id, info_hashes, &udp_tracker_client).await - } + } => handle_scrape(&tracker_socket_addr, &info_hashes).await?, }; - match response { - AnnounceIpv4(announce) => { - let json = json!({ - "transaction_id": announce.transaction_id.0, - "announce_interval": announce.announce_interval.0, - "leechers": announce.leechers.0, - "seeders": announce.seeders.0, - "peers": announce.peers.iter().map(|peer| format!("{}:{}", peer.ip_address, peer.port.0)).collect::>(), - }); - let pretty_json = serde_json::to_string_pretty(&json).context("announce IPv4 response JSON serialization")?; - println!("{pretty_json}"); - } - AnnounceIpv6(announce) => { - let json = json!({ - "transaction_id": announce.transaction_id.0, - "announce_interval": announce.announce_interval.0, - "leechers": announce.leechers.0, - "seeders": announce.seeders.0, - "peers6": announce.peers.iter().map(|peer| format!("{}:{}", peer.ip_address, peer.port.0)).collect::>(), - }); - let pretty_json = serde_json::to_string_pretty(&json).context("announce IPv6 response JSON serialization")?; - println!("{pretty_json}"); - } - Scrape(scrape) => { - let json = json!({ - "transaction_id": scrape.transaction_id.0, - "torrent_stats": scrape.torrent_stats.iter().map(|torrent_scrape_statistics| json!({ - "seeders": torrent_scrape_statistics.seeders.0, - "completed": torrent_scrape_statistics.completed.0, - "leechers": torrent_scrape_statistics.leechers.0, - })).collect::>(), - }); - let pretty_json = serde_json::to_string_pretty(&json).context("scrape response JSON serialization")?; - println!("{pretty_json}"); - } - _ => println!("{response:#?}"), // todo: serialize to JSON all responses. - }; - - Ok(()) + print_response(response) } fn setup_logging(level: LevelFilter) { @@ -210,6 +141,57 @@ fn setup_logging(level: LevelFilter) { debug!("logging initialized."); } +async fn handle_announce(tracker_socket_addr: &SocketAddr, info_hash: &TorrustInfoHash) -> anyhow::Result { + let transaction_id = TransactionId(RANDOM_TRANSACTION_ID); + + let mut client = checker::Client::default(); + + let bound_to = client.bind_and_connect(ASSIGNED_BY_OS, tracker_socket_addr).await?; + + let connection_id = client.send_connection_request(transaction_id).await?; + + client + .send_announce_request(connection_id, transaction_id, *info_hash, Port(bound_to.port())) + .await +} + +async fn handle_scrape(tracker_socket_addr: &SocketAddr, info_hashes: &[TorrustInfoHash]) -> anyhow::Result { + let transaction_id = TransactionId(RANDOM_TRANSACTION_ID); + + let mut client = checker::Client::default(); + + let _bound_to = client.bind_and_connect(ASSIGNED_BY_OS, tracker_socket_addr).await?; + + let connection_id = client.send_connection_request(transaction_id).await?; + + client + .send_scrape_request(connection_id, transaction_id, info_hashes.to_vec()) + .await +} + +fn print_response(response: Response) -> anyhow::Result<()> { + match response { + AnnounceIpv4(response) => { + let pretty_json = serde_json::to_string_pretty(&AnnounceResponseDto::from(response)) + .context("announce IPv4 response JSON serialization")?; + println!("{pretty_json}"); + } + AnnounceIpv6(response) => { + let pretty_json = serde_json::to_string_pretty(&AnnounceResponseDto::from(response)) + .context("announce IPv6 response JSON serialization")?; + println!("{pretty_json}"); + } + Scrape(response) => { + let pretty_json = + serde_json::to_string_pretty(&ScrapeResponseDto::from(response)).context("scrape response JSON serialization")?; + println!("{pretty_json}"); + } + _ => println!("{response:#?}"), // todo: serialize to JSON all aquatic responses. + }; + + Ok(()) +} + fn parse_socket_addr(tracker_socket_addr_str: &str) -> anyhow::Result { debug!("Tracker socket address: {tracker_socket_addr_str:#?}"); @@ -265,95 +247,3 @@ fn parse_info_hash(info_hash_str: &str) -> anyhow::Result { TorrustInfoHash::from_str(info_hash_str) .map_err(|e| anyhow::Error::msg(format!("failed to parse info-hash `{info_hash_str}`: {e:?}"))) } - -async fn connect( - tracker_socket_addr: &SocketAddr, - udp_client: UdpClient, - transaction_id: TransactionId, -) -> (ConnectionId, UdpTrackerClient) { - debug!("Connecting to tracker: udp://{tracker_socket_addr}"); - - udp_client.connect(&tracker_socket_addr.to_string()).await; - - let udp_tracker_client = UdpTrackerClient { udp_client }; - - let connection_id = send_connection_request(transaction_id, &udp_tracker_client).await; - - (connection_id, udp_tracker_client) -} - -async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId { - debug!("Sending connection request with transaction id: {transaction_id:#?}"); - - let connect_request = ConnectRequest { transaction_id }; - - client.send(connect_request.into()).await; - - let response = client.receive().await; - - debug!("connection request response:\n{response:#?}"); - - match response { - Response::Connect(connect_response) => connect_response.connection_id, - _ => panic!("error connecting to udp server. Unexpected response"), - } -} - -async fn send_announce_request( - connection_id: ConnectionId, - transaction_id: TransactionId, - info_hash: TorrustInfoHash, - port: Port, - client: &UdpTrackerClient, -) -> Response { - debug!("Sending announce request with transaction id: {transaction_id:#?}"); - - let announce_request = AnnounceRequest { - connection_id, - transaction_id, - info_hash: InfoHash(info_hash.bytes()), - peer_id: PeerId(*b"-qB00000000000000001"), - bytes_downloaded: NumberOfBytes(0i64), - bytes_uploaded: NumberOfBytes(0i64), - bytes_left: NumberOfBytes(0i64), - event: AnnounceEvent::Started, - ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)), - key: PeerKey(0u32), - peers_wanted: NumberOfPeers(1i32), - port, - }; - - client.send(announce_request.into()).await; - - let response = client.receive().await; - - debug!("announce request response:\n{response:#?}"); - - response -} - -async fn send_scrape_request( - connection_id: ConnectionId, - transaction_id: TransactionId, - info_hashes: Vec, - client: &UdpTrackerClient, -) -> Response { - debug!("Sending scrape request with transaction id: {transaction_id:#?}"); - - let scrape_request = ScrapeRequest { - connection_id, - transaction_id, - info_hashes: info_hashes - .iter() - .map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes())) - .collect(), - }; - - client.send(scrape_request.into()).await; - - let response = client.receive().await; - - debug!("scrape request response:\n{response:#?}"); - - response -} diff --git a/src/console/clients/udp/checker.rs b/src/console/clients/udp/checker.rs new file mode 100644 index 000000000..b35139e49 --- /dev/null +++ b/src/console/clients/udp/checker.rs @@ -0,0 +1,214 @@ +use std::net::{Ipv4Addr, SocketAddr}; + +use anyhow::Context; +use aquatic_udp_protocol::common::InfoHash; +use aquatic_udp_protocol::{ + AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response, + ScrapeRequest, TransactionId, +}; +use log::debug; +use thiserror::Error; + +use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash; +use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient}; + +#[derive(Error, Debug)] +pub enum ClientError { + #[error("Local socket address is not bound yet. Try binding before connecting.")] + NotBound, + #[error("Not connected to remote tracker UDP socket. Try connecting before making requests.")] + NotConnected, + #[error("Unexpected response while connecting the the remote server.")] + UnexpectedConnectionResponse, +} + +/// A UDP Tracker client to make test requests (checks). +#[derive(Debug, Default)] +pub struct Client { + /// Local UDP socket. It could be 0 to assign a free port. + local_binding_address: Option, + + /// Local UDP socket after binding. It's equals to binding address if a + /// non- zero port was used. + local_bound_address: Option, + + /// Remote UDP tracker socket + remote_socket: Option, + + /// The client used to make UDP requests to the tracker. + udp_tracker_client: Option, +} + +impl Client { + /// Binds to the local socket and connects to the remote one. + /// + /// # Errors + /// + /// Will return an error if + /// + /// - It can't bound to the local socket address. + /// - It can't make a connection request successfully to the remote UDP server. + pub async fn bind_and_connect(&mut self, local_port: u16, remote_socket_addr: &SocketAddr) -> anyhow::Result { + let bound_to = self.bind(local_port).await?; + self.connect(remote_socket_addr).await?; + Ok(bound_to) + } + + /// Binds local client socket. + /// + /// # Errors + /// + /// Will return an error if it can't bound to the local address. + async fn bind(&mut self, local_port: u16) -> anyhow::Result { + let local_bind_to = format!("0.0.0.0:{local_port}"); + let binding_address = local_bind_to.parse().context("binding local address")?; + + debug!("Binding to: {local_bind_to}"); + let udp_client = UdpClient::bind(&local_bind_to).await; + + let bound_to = udp_client.socket.local_addr().context("bound local address")?; + debug!("Bound to: {bound_to}"); + + self.local_binding_address = Some(binding_address); + self.local_bound_address = Some(bound_to); + + self.udp_tracker_client = Some(UdpTrackerClient { udp_client }); + + Ok(bound_to) + } + + /// Connects to the remote server socket. + /// + /// # Errors + /// + /// Will return and error if it can't make a connection request successfully + /// to the remote UDP server. + async fn connect(&mut self, tracker_socket_addr: &SocketAddr) -> anyhow::Result<()> { + debug!("Connecting to tracker: udp://{tracker_socket_addr}"); + + match &self.udp_tracker_client { + Some(client) => { + client.udp_client.connect(&tracker_socket_addr.to_string()).await; + self.remote_socket = Some(*tracker_socket_addr); + Ok(()) + } + None => Err(ClientError::NotBound.into()), + } + } + + /// Sends a connection request to the UDP Tracker server. + /// + /// # Errors + /// + /// Will return and error if + /// + /// - It can't connect to the remote UDP socket. + /// - It can't make a connection request successfully to the remote UDP + /// server (after successfully connecting to the remote UDP socket). + /// + /// # Panics + /// + /// Will panic if it receives an unexpected response. + pub async fn send_connection_request(&self, transaction_id: TransactionId) -> anyhow::Result { + debug!("Sending connection request with transaction id: {transaction_id:#?}"); + + let connect_request = ConnectRequest { transaction_id }; + + match &self.udp_tracker_client { + Some(client) => { + client.send(connect_request.into()).await; + + let response = client.receive().await; + + debug!("connection request response:\n{response:#?}"); + + match response { + Response::Connect(connect_response) => Ok(connect_response.connection_id), + _ => Err(ClientError::UnexpectedConnectionResponse.into()), + } + } + None => Err(ClientError::NotConnected.into()), + } + } + + /// Sends an announce request to the UDP Tracker server. + /// + /// # Errors + /// + /// Will return and error if the client is not connected. You have to connect + /// before calling this function. + pub async fn send_announce_request( + &self, + connection_id: ConnectionId, + transaction_id: TransactionId, + info_hash: TorrustInfoHash, + client_port: Port, + ) -> anyhow::Result { + debug!("Sending announce request with transaction id: {transaction_id:#?}"); + + let announce_request = AnnounceRequest { + connection_id, + transaction_id, + info_hash: InfoHash(info_hash.bytes()), + peer_id: PeerId(*b"-qB00000000000000001"), + bytes_downloaded: NumberOfBytes(0i64), + bytes_uploaded: NumberOfBytes(0i64), + bytes_left: NumberOfBytes(0i64), + event: AnnounceEvent::Started, + ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)), + key: PeerKey(0u32), + peers_wanted: NumberOfPeers(1i32), + port: client_port, + }; + + match &self.udp_tracker_client { + Some(client) => { + client.send(announce_request.into()).await; + + let response = client.receive().await; + + debug!("announce request response:\n{response:#?}"); + + Ok(response) + } + None => Err(ClientError::NotConnected.into()), + } + } + + /// Sends a scrape request to the UDP Tracker server. + /// + /// # Errors + /// + /// Will return and error if the client is not connected. You have to connect + /// before calling this function. + pub async fn send_scrape_request( + &self, + connection_id: ConnectionId, + transaction_id: TransactionId, + info_hashes: Vec, + ) -> anyhow::Result { + debug!("Sending scrape request with transaction id: {transaction_id:#?}"); + + let scrape_request = ScrapeRequest { + connection_id, + transaction_id, + info_hashes: info_hashes + .iter() + .map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes())) + .collect(), + }; + + match &self.udp_tracker_client { + Some(client) => { + client.send(scrape_request.into()).await; + + let response = client.receive().await; + + debug!("scrape request response:\n{response:#?}"); + + Ok(response) + } + None => Err(ClientError::NotConnected.into()), + } + } +} diff --git a/src/console/clients/udp/mod.rs b/src/console/clients/udp/mod.rs index 309be6287..2fcb26ed0 100644 --- a/src/console/clients/udp/mod.rs +++ b/src/console/clients/udp/mod.rs @@ -1 +1,3 @@ pub mod app; +pub mod checker; +pub mod responses; diff --git a/src/console/clients/udp/responses.rs b/src/console/clients/udp/responses.rs new file mode 100644 index 000000000..020c7a367 --- /dev/null +++ b/src/console/clients/udp/responses.rs @@ -0,0 +1,83 @@ +//! Aquatic responses are not serializable. These are the serializable wrappers. +use std::net::{Ipv4Addr, Ipv6Addr}; + +use aquatic_udp_protocol::{AnnounceResponse, ScrapeResponse}; +use serde::Serialize; + +#[derive(Serialize)] +pub struct AnnounceResponseDto { + transaction_id: i32, + announce_interval: i32, + leechers: i32, + seeders: i32, + peers: Vec, +} + +impl From> for AnnounceResponseDto { + fn from(announce: AnnounceResponse) -> Self { + Self { + transaction_id: announce.transaction_id.0, + announce_interval: announce.announce_interval.0, + leechers: announce.leechers.0, + seeders: announce.seeders.0, + peers: announce + .peers + .iter() + .map(|peer| format!("{}:{}", peer.ip_address, peer.port.0)) + .collect::>(), + } + } +} + +impl From> for AnnounceResponseDto { + fn from(announce: AnnounceResponse) -> Self { + Self { + transaction_id: announce.transaction_id.0, + announce_interval: announce.announce_interval.0, + leechers: announce.leechers.0, + seeders: announce.seeders.0, + peers: announce + .peers + .iter() + .map(|peer| format!("{}:{}", peer.ip_address, peer.port.0)) + .collect::>(), + } + } +} + +#[derive(Serialize)] +pub struct ScrapeResponseDto { + transaction_id: i32, + torrent_stats: Vec, +} + +impl From for ScrapeResponseDto { + fn from(scrape: ScrapeResponse) -> Self { + Self { + transaction_id: scrape.transaction_id.0, + torrent_stats: scrape + .torrent_stats + .iter() + .map(|torrent_scrape_statistics| TorrentStats { + seeders: torrent_scrape_statistics.seeders.0, + completed: torrent_scrape_statistics.completed.0, + leechers: torrent_scrape_statistics.leechers.0, + }) + .collect::>(), + } + } +} + +#[derive(Serialize)] +struct Peer { + seeders: i32, + completed: i32, + leechers: i32, +} + +#[derive(Serialize)] +struct TorrentStats { + seeders: i32, + completed: i32, + leechers: i32, +} diff --git a/src/shared/bit_torrent/tracker/udp/client.rs b/src/shared/bit_torrent/tracker/udp/client.rs index 23b718472..11c8d8f62 100644 --- a/src/shared/bit_torrent/tracker/udp/client.rs +++ b/src/shared/bit_torrent/tracker/udp/client.rs @@ -10,9 +10,18 @@ use tokio::time; use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE}; +/// Default timeout for sending and receiving packets. And waiting for sockets +/// to be readable and writable. +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); + #[allow(clippy::module_name_repetitions)] +#[derive(Debug)] pub struct UdpClient { + /// The socket to connect to pub socket: Arc, + + /// Timeout for sending and receiving packets + pub timeout: Duration, } impl UdpClient { @@ -28,6 +37,7 @@ impl UdpClient { Self { socket: Arc::new(socket), + timeout: DEFAULT_TIMEOUT, } } @@ -52,10 +62,23 @@ impl UdpClient { /// - Can't write to the socket. /// - Can't send data. pub async fn send(&self, bytes: &[u8]) -> usize { - debug!(target: "UDP client", "send {bytes:?}"); + debug!(target: "UDP client", "sending {bytes:?} ..."); + + match time::timeout(self.timeout, self.socket.writable()).await { + Ok(writable_result) => match writable_result { + Ok(()) => (), + Err(e) => panic!("{}", format!("IO error waiting for the socket to become readable: {e:?}")), + }, + Err(e) => panic!("{}", format!("Timeout waiting for the socket to become readable: {e:?}")), + }; - self.socket.writable().await.unwrap(); - self.socket.send(bytes).await.unwrap() + match time::timeout(self.timeout, self.socket.send(bytes)).await { + Ok(send_result) => match send_result { + Ok(size) => size, + Err(e) => panic!("{}", format!("IO error during send: {e:?}")), + }, + Err(e) => panic!("{}", format!("Send operation timed out: {e:?}")), + } } /// # Panics @@ -67,9 +90,21 @@ impl UdpClient { pub async fn receive(&self, bytes: &mut [u8]) -> usize { debug!(target: "UDP client", "receiving ..."); - self.socket.readable().await.unwrap(); + match time::timeout(self.timeout, self.socket.readable()).await { + Ok(readable_result) => match readable_result { + Ok(()) => (), + Err(e) => panic!("{}", format!("IO error waiting for the socket to become readable: {e:?}")), + }, + Err(e) => panic!("{}", format!("Timeout waiting for the socket to become readable: {e:?}")), + }; - let size = self.socket.recv(bytes).await.unwrap(); + let size = match time::timeout(self.timeout, self.socket.recv(bytes)).await { + Ok(recv_result) => match recv_result { + Ok(size) => size, + Err(e) => panic!("{}", format!("IO error during send: {e:?}")), + }, + Err(e) => panic!("{}", format!("Receive operation timed out: {e:?}")), + }; debug!(target: "UDP client", "{size} bytes received {bytes:?}"); @@ -86,6 +121,7 @@ pub async fn new_udp_client_connected(remote_address: &str) -> UdpClient { } #[allow(clippy::module_name_repetitions)] +#[derive(Debug)] pub struct UdpTrackerClient { pub udp_client: UdpClient, }