From ea8d4d8432f20a93850a90f6aee0e325e55c7dc6 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 22 Feb 2023 13:13:55 +0000 Subject: [PATCH 1/9] feat(http): [#191] add route and extractor for scrape req in Axum HTTP tracker with only one infohash in the URL: http://localhost:7070/scrape?info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0 It does not allow more than one infohas yet. --- .../axum_implementation/extractors/mod.rs | 1 + .../extractors/scrape_request.rs | 45 ++++++ src/http/axum_implementation/handlers/mod.rs | 1 + .../axum_implementation/handlers/scrape.rs | 19 +++ .../axum_implementation/handlers/status.rs | 2 +- src/http/axum_implementation/requests/mod.rs | 1 + .../axum_implementation/requests/scrape.rs | 137 ++++++++++++++++++ src/http/axum_implementation/routes.rs | 10 +- 8 files changed, 211 insertions(+), 5 deletions(-) create mode 100644 src/http/axum_implementation/extractors/scrape_request.rs create mode 100644 src/http/axum_implementation/handlers/scrape.rs create mode 100644 src/http/axum_implementation/requests/scrape.rs diff --git a/src/http/axum_implementation/extractors/mod.rs b/src/http/axum_implementation/extractors/mod.rs index 65b2775a9..380eeda6d 100644 --- a/src/http/axum_implementation/extractors/mod.rs +++ b/src/http/axum_implementation/extractors/mod.rs @@ -1,3 +1,4 @@ pub mod announce_request; pub mod peer_ip; pub mod remote_client_ip; +pub mod scrape_request; diff --git a/src/http/axum_implementation/extractors/scrape_request.rs b/src/http/axum_implementation/extractors/scrape_request.rs new file mode 100644 index 000000000..4212abfcb --- /dev/null +++ b/src/http/axum_implementation/extractors/scrape_request.rs @@ -0,0 +1,45 @@ +use std::panic::Location; + +use axum::async_trait; +use axum::extract::FromRequestParts; +use axum::http::request::Parts; +use axum::response::{IntoResponse, Response}; + +use crate::http::axum_implementation::query::Query; +use crate::http::axum_implementation::requests::scrape::{ParseScrapeQueryError, Scrape}; +use crate::http::axum_implementation::responses; + +pub struct ExtractRequest(pub Scrape); + +#[async_trait] +impl FromRequestParts for ExtractRequest +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + let raw_query = parts.uri.query(); + + if raw_query.is_none() { + return Err(responses::error::Error::from(ParseScrapeQueryError::MissingParams { + location: Location::caller(), + }) + .into_response()); + } + + let query = raw_query.unwrap().parse::(); + + if let Err(error) = query { + return Err(responses::error::Error::from(error).into_response()); + } + + let scrape_request = Scrape::try_from(query.unwrap()); + + if let Err(error) = scrape_request { + return Err(responses::error::Error::from(error).into_response()); + } + + Ok(ExtractRequest(scrape_request.unwrap())) + } +} diff --git a/src/http/axum_implementation/handlers/mod.rs b/src/http/axum_implementation/handlers/mod.rs index bff05984c..4e6849534 100644 --- a/src/http/axum_implementation/handlers/mod.rs +++ b/src/http/axum_implementation/handlers/mod.rs @@ -1,2 +1,3 @@ pub mod announce; +pub mod scrape; pub mod status; diff --git a/src/http/axum_implementation/handlers/scrape.rs b/src/http/axum_implementation/handlers/scrape.rs new file mode 100644 index 000000000..094bf844b --- /dev/null +++ b/src/http/axum_implementation/handlers/scrape.rs @@ -0,0 +1,19 @@ +use std::sync::Arc; + +use axum::extract::State; +use log::debug; + +use crate::http::axum_implementation::extractors::remote_client_ip::RemoteClientIp; +use crate::http::axum_implementation::extractors::scrape_request::ExtractRequest; +use crate::tracker::Tracker; + +#[allow(clippy::unused_async)] +pub async fn handle( + State(_tracker): State>, + ExtractRequest(scrape_request): ExtractRequest, + _remote_client_ip: RemoteClientIp, +) -> String { + debug!("http scrape request: {:#?}", &scrape_request); + + format!("{:#?}", &scrape_request) +} diff --git a/src/http/axum_implementation/handlers/status.rs b/src/http/axum_implementation/handlers/status.rs index d4031aef5..8a058b456 100644 --- a/src/http/axum_implementation/handlers/status.rs +++ b/src/http/axum_implementation/handlers/status.rs @@ -7,6 +7,6 @@ use crate::http::axum_implementation::resources::ok::Ok; use crate::http::axum_implementation::responses::ok; #[allow(clippy::unused_async)] -pub async fn get_status_handler(remote_client_ip: RemoteClientIp) -> Json { +pub async fn handle(remote_client_ip: RemoteClientIp) -> Json { ok::response(&remote_client_ip) } diff --git a/src/http/axum_implementation/requests/mod.rs b/src/http/axum_implementation/requests/mod.rs index 74894de33..776d2dfbf 100644 --- a/src/http/axum_implementation/requests/mod.rs +++ b/src/http/axum_implementation/requests/mod.rs @@ -1 +1,2 @@ pub mod announce; +pub mod scrape; diff --git a/src/http/axum_implementation/requests/scrape.rs b/src/http/axum_implementation/requests/scrape.rs new file mode 100644 index 000000000..483738a03 --- /dev/null +++ b/src/http/axum_implementation/requests/scrape.rs @@ -0,0 +1,137 @@ +use std::panic::Location; + +use thiserror::Error; + +use crate::http::axum_implementation::query::Query; +use crate::http::axum_implementation::responses; +use crate::http::percent_encoding::percent_decode_info_hash; +use crate::located_error::{Located, LocatedError}; +use crate::protocol::info_hash::{ConversionError, InfoHash}; + +pub type NumberOfBytes = i64; + +// Query param name +const INFO_HASH_SCRAPE_PARAM: &str = "info_hash"; + +#[derive(Debug, PartialEq)] +pub struct Scrape { + pub info_hashes: Vec, +} + +#[derive(Error, Debug)] +pub enum ParseScrapeQueryError { + #[error("missing query params for scrape request in {location}")] + MissingParams { location: &'static Location<'static> }, + #[error("missing param {param_name} in {location}")] + MissingParam { + location: &'static Location<'static>, + param_name: String, + }, + #[error("invalid param value {param_value} for {param_name} in {location}")] + InvalidParam { + param_name: String, + param_value: String, + location: &'static Location<'static>, + }, + #[error("invalid param value {param_value} for {param_name} in {source}")] + InvalidInfoHashParam { + param_name: String, + param_value: String, + source: LocatedError<'static, ConversionError>, + }, +} + +impl From for responses::error::Error { + fn from(err: ParseScrapeQueryError) -> Self { + responses::error::Error { + failure_reason: format!("Cannot parse query params for scrape request: {err}"), + } + } +} + +impl TryFrom for Scrape { + type Error = ParseScrapeQueryError; + + fn try_from(query: Query) -> Result { + Ok(Self { + info_hashes: extract_info_hashes(&query)?, + }) + } +} + +fn extract_info_hashes(query: &Query) -> Result, ParseScrapeQueryError> { + match query.get_param(INFO_HASH_SCRAPE_PARAM) { + Some(raw_param) => { + let mut info_hashes = vec![]; + + // todo: multiple infohashes + + let info_hash = percent_decode_info_hash(&raw_param).map_err(|err| ParseScrapeQueryError::InvalidInfoHashParam { + param_name: INFO_HASH_SCRAPE_PARAM.to_owned(), + param_value: raw_param.clone(), + source: Located(err).into(), + })?; + + info_hashes.push(info_hash); + + Ok(info_hashes) + } + None => { + return Err(ParseScrapeQueryError::MissingParam { + location: Location::caller(), + param_name: INFO_HASH_SCRAPE_PARAM.to_owned(), + }) + } + } +} + +#[cfg(test)] +mod tests { + + mod scrape_request { + + use crate::http::axum_implementation::query::Query; + use crate::http::axum_implementation::requests::scrape::{Scrape, INFO_HASH_SCRAPE_PARAM}; + use crate::protocol::info_hash::InfoHash; + + #[test] + fn should_be_instantiated_from_the_url_query_with_only_one_infohash() { + let raw_query = Query::from(vec![( + INFO_HASH_SCRAPE_PARAM, + "%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0", + )]) + .to_string(); + + let query = raw_query.parse::().unwrap(); + + let scrape_request = Scrape::try_from(query).unwrap(); + + assert_eq!( + scrape_request, + Scrape { + info_hashes: vec!["3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::().unwrap()], + } + ); + } + + mod when_it_is_instantiated_from_the_url_query_params { + + use crate::http::axum_implementation::query::Query; + use crate::http::axum_implementation::requests::scrape::{Scrape, INFO_HASH_SCRAPE_PARAM}; + + #[test] + fn it_should_fail_if_the_query_does_not_include_the_info_hash_param() { + let raw_query_without_info_hash = "another_param=NOT_RELEVANT"; + + assert!(Scrape::try_from(raw_query_without_info_hash.parse::().unwrap()).is_err()); + } + + #[test] + fn it_should_fail_if_the_info_hash_param_is_invalid() { + let raw_query = Query::from(vec![(INFO_HASH_SCRAPE_PARAM, "INVALID_INFO_HASH_VALUE")]).to_string(); + + assert!(Scrape::try_from(raw_query.parse::().unwrap()).is_err()); + } + } + } +} diff --git a/src/http/axum_implementation/routes.rs b/src/http/axum_implementation/routes.rs index 6138f5acf..1d4d67e73 100644 --- a/src/http/axum_implementation/routes.rs +++ b/src/http/axum_implementation/routes.rs @@ -4,15 +4,17 @@ use axum::routing::get; use axum::Router; use axum_client_ip::SecureClientIpSource; -use super::handlers::announce::handle; -use super::handlers::status::get_status_handler; +use super::handlers::{announce, scrape, status}; use crate::tracker::Tracker; pub fn router(tracker: &Arc) -> Router { Router::new() // Status - .route("/status", get(get_status_handler)) + .route("/status", get(status::handle)) // Announce request - .route("/announce", get(handle).with_state(tracker.clone())) + .route("/announce", get(announce::handle).with_state(tracker.clone())) + // Scrape request + .route("/scrape", get(scrape::handle).with_state(tracker.clone())) + // Add extension to get the client IP from the connection info .layer(SecureClientIpSource::ConnectInfo.into_extension()) } From 0cab696061eade23a0080c4a802e67df2c8c939a Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Thu, 23 Feb 2023 18:28:31 +0000 Subject: [PATCH 2/9] feat(http): [#191] add cargo dependency: multimap THat dependency will be use to store URL query param in a MultiMap struct, becuase query params can have multiple values like this: ``` param1=value1¶m1=value2 ``` The multimaps allows to add multiple values to a HashMap. --- Cargo.lock | 10 ++++++++++ Cargo.toml | 1 + cSpell.json | 1 + 3 files changed, 12 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 05b439353..cfd8aaba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1477,6 +1477,15 @@ dependencies = [ "syn", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +dependencies = [ + "serde", +] + [[package]] name = "multipart" version = "0.18.0" @@ -2945,6 +2954,7 @@ dependencies = [ "local-ip-address", "log", "mockall", + "multimap", "openssl", "percent-encoding", "r2d2", diff --git a/Cargo.toml b/Cargo.toml index 917bc9e31..fa126a152 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ axum = "0.6.1" axum-server = { version = "0.4.4", features = ["tls-rustls"] } axum-client-ip = "0.4.0" bip_bencode = "0.4.4" +multimap = "0.8.3" [dev-dependencies] diff --git a/cSpell.json b/cSpell.json index a451d18dc..b8aceb568 100644 --- a/cSpell.json +++ b/cSpell.json @@ -37,6 +37,7 @@ "Lphant", "middlewares", "mockall", + "multimap", "myacicontext", "nanos", "nextest", From 30cf3b9d66c4452d74719b0164b0258bd106bd50 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Thu, 23 Feb 2023 18:30:58 +0000 Subject: [PATCH 3/9] feat(http): [#192] Query struct allows multiple values for the same param The `torrust_tracker::http::axum_implementation::query` allow mutiple values for the same URL query param, for example: ``` param1=value1¶m2=value2 ``` It's needed in the `scrape` request: http://localhost:7070/scrape?info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0&info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0 --- src/http/axum_implementation/query.rs | 229 ++++++++++++++++++-------- 1 file changed, 157 insertions(+), 72 deletions(-) diff --git a/src/http/axum_implementation/query.rs b/src/http/axum_implementation/query.rs index cad58c17b..8b01e9db7 100644 --- a/src/http/axum_implementation/query.rs +++ b/src/http/axum_implementation/query.rs @@ -1,19 +1,50 @@ -use std::collections::HashMap; use std::panic::Location; use std::str::FromStr; +use multimap::MultiMap; use thiserror::Error; -/// Represent a URL query component with some restrictions. -/// It does not allow duplicate param names like this: `param1=value1¶m1=value2` -/// It would take the second value for `param1`. +type ParamName = String; +type ParamValue = String; + +/// Represent a URL query component: +/// +/// ```text +/// URI = scheme ":" ["//" authority] path ["?" query] ["#" fragment] +/// ``` +#[derive(Debug)] pub struct Query { /* code-review: - - Consider using `HashMap`, because it does not allow you to add a second value for the same param name. - Consider using a third-party crate. - Conversion from/to string is not deterministic. Params can be in a different order in the query string. */ - params: HashMap, + params: MultiMap, +} + +impl Query { + /// Returns only the first param value even if it has multiple values like this: + /// + /// ```text + /// param1=value1¶m1=value2 + /// ``` + /// + /// In that case `get_param("param1")` will return `value1`. + #[must_use] + pub fn get_param(&self, name: &str) -> Option { + self.params.get(name).map(|pair| pair.value.clone()) + } + + /// Returns all the param values as a vector even if it has only one value. + #[must_use] + pub fn get_param_vec(&self, name: &str) -> Option> { + self.params.get_vec(name).map(|pairs| { + let mut param_values = vec![]; + for pair in pairs { + param_values.push(pair.value.to_string()); + } + param_values + }) + } } #[derive(Error, Debug)] @@ -29,13 +60,14 @@ impl FromStr for Query { type Err = ParseQueryError; fn from_str(raw_query: &str) -> Result { - let mut params: HashMap = HashMap::new(); + let mut params: MultiMap = MultiMap::new(); let raw_params = raw_query.trim().trim_start_matches('?').split('&').collect::>(); for raw_param in raw_params { - let param: Param = raw_param.parse()?; - params.insert(param.name, param.value); + let pair: NameValuePair = raw_param.parse()?; + let param_name = pair.name.clone(); + params.insert(param_name, pair); } Ok(Self { params }) @@ -44,10 +76,10 @@ impl FromStr for Query { impl From> for Query { fn from(raw_params: Vec<(&str, &str)>) -> Self { - let mut params: HashMap = HashMap::new(); + let mut params: MultiMap = MultiMap::new(); for raw_param in raw_params { - params.insert(raw_param.0.to_owned(), raw_param.1.to_owned()); + params.insert(raw_param.0.to_owned(), NameValuePair::new(raw_param.0, raw_param.1)); } Self { params } @@ -58,8 +90,8 @@ impl std::fmt::Display for Query { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let query = self .params - .iter() - .map(|param| format!("{}", Param::new(param.0, param.1))) + .iter_all() + .map(|param| format!("{}", FieldValuePairSet::from_vec(param.1))) .collect::>() .join("&"); @@ -67,20 +99,22 @@ impl std::fmt::Display for Query { } } -impl Query { - #[must_use] - pub fn get_param(&self, name: &str) -> Option { - self.params.get(name).map(std::string::ToString::to_string) - } +#[derive(Debug, PartialEq, Clone)] +struct NameValuePair { + name: ParamName, + value: ParamValue, } -#[derive(Debug, PartialEq)] -struct Param { - name: String, - value: String, +impl NameValuePair { + pub fn new(name: &str, value: &str) -> Self { + Self { + name: name.to_owned(), + value: value.to_owned(), + } + } } -impl FromStr for Param { +impl FromStr for NameValuePair { type Err = ParseQueryError; fn from_str(raw_param: &str) -> Result { @@ -100,18 +134,39 @@ impl FromStr for Param { } } -impl std::fmt::Display for Param { +impl std::fmt::Display for NameValuePair { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}={}", self.name, self.value) } } -impl Param { - pub fn new(name: &str, value: &str) -> Self { - Self { - name: name.to_owned(), - value: value.to_owned(), +#[derive(Debug, PartialEq)] +struct FieldValuePairSet { + pairs: Vec, +} + +impl FieldValuePairSet { + fn from_vec(pair_vec: &Vec) -> Self { + let mut pairs: Vec = vec![]; + + for pair in pair_vec { + pairs.push(pair.clone()); } + + Self { pairs } + } +} + +impl std::fmt::Display for FieldValuePairSet { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let query = self + .pairs + .iter() + .map(|pair| format!("{pair}")) + .collect::>() + .join("&"); + + write!(f, "{query}") } } @@ -136,6 +191,14 @@ mod tests { assert_eq!(query.get_param("port").unwrap(), "17548"); } + #[test] + fn should_be_instantiated_from_a_string_pair_vector() { + let query = Query::from(vec![("param1", "value1"), ("param2", "value2")]); + + assert_eq!(query.get_param("param1"), Some("value1".to_string())); + assert_eq!(query.get_param("param2"), Some("value2".to_string())); + } + #[test] fn should_fail_parsing_an_invalid_query_string() { let invalid_raw_query = "name=value=value"; @@ -151,7 +214,7 @@ mod tests { let query = raw_query.parse::().unwrap(); - assert_eq!(query.get_param("name").unwrap(), "value"); + assert_eq!(query.get_param("name"), Some("value".to_string())); } #[test] @@ -160,61 +223,83 @@ mod tests { let query = raw_query.parse::().unwrap(); - assert_eq!(query.get_param("name").unwrap(), "value"); - } - - #[test] - fn should_be_instantiated_from_a_string_pair_vector() { - let query = Query::from(vec![("param1", "value1"), ("param2", "value2")]).to_string(); - - assert!(query == "param1=value1¶m2=value2" || query == "param2=value2¶m1=value1"); + assert_eq!(query.get_param("name"), Some("value".to_string())); } - #[test] - fn should_not_allow_more_than_one_value_for_the_same_param() { - let query = Query::from(vec![("param1", "value1"), ("param1", "value2"), ("param1", "value3")]).to_string(); - - assert_eq!(query, "param1=value3"); + mod should_allow_more_than_one_value_for_the_same_param { + use crate::http::axum_implementation::query::Query; + + #[test] + fn instantiated_from_a_vector() { + let query1 = Query::from(vec![("param1", "value1"), ("param1", "value2")]); + assert_eq!( + query1.get_param_vec("param1"), + Some(vec!["value1".to_string(), "value2".to_string()]) + ); + } + + #[test] + fn parsed_from_an_string() { + let query2 = "param1=value1¶m1=value2".parse::().unwrap(); + assert_eq!( + query2.get_param_vec("param1"), + Some(vec!["value1".to_string(), "value2".to_string()]) + ); + } } - #[test] - fn should_be_displayed() { - let query = "param1=value1¶m2=value2".parse::().unwrap().to_string(); - - assert!(query == "param1=value1¶m2=value2" || query == "param2=value2¶m1=value1"); + mod should_be_displayed { + use crate::http::axum_implementation::query::Query; + + #[test] + fn with_one_param() { + assert_eq!("param1=value1".parse::().unwrap().to_string(), "param1=value1"); + } + + #[test] + fn with_multiple_params() { + let query = "param1=value1¶m2=value2".parse::().unwrap().to_string(); + assert!(query == "param1=value1¶m2=value2" || query == "param2=value2¶m1=value1"); + } + + #[test] + fn with_multiple_values_for_the_same_param() { + let query = "param1=value1¶m1=value2".parse::().unwrap().to_string(); + assert!(query == "param1=value1¶m1=value2" || query == "param1=value2¶m1=value1"); + } } - } - mod url_query_param { - use crate::http::axum_implementation::query::Param; + mod param_name_value_pair { + use crate::http::axum_implementation::query::NameValuePair; - #[test] - fn should_parse_a_single_query_param() { - let raw_param = "name=value"; + #[test] + fn should_parse_a_single_query_param() { + let raw_param = "name=value"; - let param = raw_param.parse::().unwrap(); + let param = raw_param.parse::().unwrap(); - assert_eq!( - param, - Param { - name: "name".to_string(), - value: "value".to_string(), - } - ); - } + assert_eq!( + param, + NameValuePair { + name: "name".to_string(), + value: "value".to_string(), + } + ); + } - #[test] - fn should_fail_parsing_an_invalid_query_param() { - let invalid_raw_param = "name=value=value"; + #[test] + fn should_fail_parsing_an_invalid_query_param() { + let invalid_raw_param = "name=value=value"; - let query = invalid_raw_param.parse::(); + let query = invalid_raw_param.parse::(); - assert!(query.is_err()); - } + assert!(query.is_err()); + } - #[test] - fn should_be_displayed() { - assert_eq!("name=value".parse::().unwrap().to_string(), "name=value"); + #[test] + fn should_be_displayed() { + assert_eq!("name=value".parse::().unwrap().to_string(), "name=value"); + } } } } From 2de8265eaba098f5c69cd8cbfcbc37f05d958044 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Thu, 23 Feb 2023 18:33:47 +0000 Subject: [PATCH 4/9] feat(http): [#191] parse scrape req with multiple infohashes --- .../axum_implementation/requests/scrape.rs | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/http/axum_implementation/requests/scrape.rs b/src/http/axum_implementation/requests/scrape.rs index 483738a03..0f23039bb 100644 --- a/src/http/axum_implementation/requests/scrape.rs +++ b/src/http/axum_implementation/requests/scrape.rs @@ -60,19 +60,20 @@ impl TryFrom for Scrape { } fn extract_info_hashes(query: &Query) -> Result, ParseScrapeQueryError> { - match query.get_param(INFO_HASH_SCRAPE_PARAM) { - Some(raw_param) => { + match query.get_param_vec(INFO_HASH_SCRAPE_PARAM) { + Some(raw_params) => { let mut info_hashes = vec![]; - // todo: multiple infohashes + for raw_param in raw_params { + let info_hash = + percent_decode_info_hash(&raw_param).map_err(|err| ParseScrapeQueryError::InvalidInfoHashParam { + param_name: INFO_HASH_SCRAPE_PARAM.to_owned(), + param_value: raw_param.clone(), + source: Located(err).into(), + })?; - let info_hash = percent_decode_info_hash(&raw_param).map_err(|err| ParseScrapeQueryError::InvalidInfoHashParam { - param_name: INFO_HASH_SCRAPE_PARAM.to_owned(), - param_value: raw_param.clone(), - source: Located(err).into(), - })?; - - info_hashes.push(info_hash); + info_hashes.push(info_hash); + } Ok(info_hashes) } From 0c7735a0b14a03f1268daa41b232d0918cbfe37f Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Thu, 23 Feb 2023 18:34:13 +0000 Subject: [PATCH 5/9] fix(http): typo in comment --- src/http/percent_encoding.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http/percent_encoding.rs b/src/http/percent_encoding.rs index 9b5b79ed7..3774519fb 100644 --- a/src/http/percent_encoding.rs +++ b/src/http/percent_encoding.rs @@ -3,7 +3,7 @@ use crate::tracker::peer::{self, IdConversionError}; /// # Errors /// -/// Will return `Err` if if the decoded bytes do not represent a valid `InfoHash`. +/// Will return `Err` if the decoded bytes do not represent a valid `InfoHash`. pub fn percent_decode_info_hash(raw_info_hash: &str) -> Result { let bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); InfoHash::try_from(bytes) From c4bee79c7ad15018bffbf39e69e663022aac16b6 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 24 Feb 2023 16:43:06 +0000 Subject: [PATCH 6/9] feat(http): [#191] add Tracker::scrape function This function returns the data we need for a scrape response regardless the method that the client is using to communicate with the tracker (UDP or HTTP). --- .../axum_implementation/handlers/scrape.rs | 14 +- src/tracker/mod.rs | 348 +++++++++++++----- src/tracker/torrent.rs | 18 + 3 files changed, 286 insertions(+), 94 deletions(-) diff --git a/src/http/axum_implementation/handlers/scrape.rs b/src/http/axum_implementation/handlers/scrape.rs index 094bf844b..2246ea7db 100644 --- a/src/http/axum_implementation/handlers/scrape.rs +++ b/src/http/axum_implementation/handlers/scrape.rs @@ -9,11 +9,21 @@ use crate::tracker::Tracker; #[allow(clippy::unused_async)] pub async fn handle( - State(_tracker): State>, + State(tracker): State>, ExtractRequest(scrape_request): ExtractRequest, _remote_client_ip: RemoteClientIp, ) -> String { debug!("http scrape request: {:#?}", &scrape_request); - format!("{:#?}", &scrape_request) + /* + todo: + - Add the service that sends the event for statistics. + - Build the HTTP bencoded response. + */ + + let scrape_data = tracker.scrape(&scrape_request.info_hashes).await; + + debug!("scrape data: {:#?}", &scrape_data); + + "todo".to_string() } diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index e01fe6a19..0a3bd7c0b 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -7,7 +7,7 @@ pub mod statistics; pub mod torrent; use std::collections::btree_map::Entry; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::net::IpAddr; use std::panic::Location; use std::sync::Arc; @@ -18,7 +18,7 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use self::error::Error; use self::peer::Peer; -use self::torrent::SwamStats; +use self::torrent::{SwamStats, SwarmMetadata}; use crate::config::Configuration; use crate::databases::driver::Driver; use crate::databases::{self, Database}; @@ -50,6 +50,27 @@ pub struct AnnounceData { pub interval_min: u32, } +#[derive(Debug, PartialEq, Default)] +pub struct ScrapeData { + files: HashMap, +} + +impl ScrapeData { + #[must_use] + pub fn empty() -> Self { + let files: HashMap = HashMap::new(); + Self { files } + } + + pub fn add_file(&mut self, info_hash: &InfoHash, swarm_metadata: SwarmMetadata) { + self.files.insert(*info_hash, swarm_metadata); + } + + pub fn add_file_with_no_metadata(&mut self, info_hash: &InfoHash) { + self.files.insert(*info_hash, SwarmMetadata::default()); + } +} + impl Tracker { /// # Errors /// @@ -85,8 +106,14 @@ impl Tracker { self.mode == mode::Mode::Listed || self.mode == mode::Mode::PrivateListed } - /// It handles an announce request + /// It handles an announce request. + /// + /// BEP 03: [The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html). pub async fn announce(&self, info_hash: &InfoHash, peer: &mut Peer, remote_client_ip: &IpAddr) -> AnnounceData { + // code-review: maybe instead of mutating the peer we could just return + // a tuple with the new peer and the announce data: (Peer, AnnounceData). + // It could even be a different struct: `StoredPeer` or `PublicPeer`. + peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.config.get_ext_ip())); let swam_stats = self.update_torrent_with_peer_and_get_stats(info_hash, peer).await; @@ -101,6 +128,27 @@ impl Tracker { } } + /// It handles a scrape request. + /// + /// BEP 48: [Tracker Protocol Extension: Scrape](https://www.bittorrent.org/beps/bep_0048.html). + pub async fn scrape(&self, info_hashes: &Vec) -> ScrapeData { + let mut scrape_data = ScrapeData::empty(); + + for info_hash in info_hashes { + scrape_data.add_file(info_hash, self.get_swarm_metadata(info_hash).await); + } + + scrape_data + } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { + let torrents = self.get_torrents().await; + match torrents.get(info_hash) { + Some(torrent_entry) => torrent_entry.get_swarm_metadata(), + None => SwarmMetadata::default(), + } + } + /// # Errors /// /// Will return a `database::Error` if unable to add the `auth_key` to the database. @@ -416,143 +464,259 @@ fn assign_ip_address_to_peer(remote_client_ip: &IpAddr, tracker_external_ip: Opt #[cfg(test)] mod tests { - use std::sync::Arc; - use super::statistics::Keeper; - use super::{TorrentsMetrics, Tracker}; - use crate::config::{ephemeral_configuration, Configuration}; + mod the_tracker { - pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) - } + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Arc; + + use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; + + use crate::config::{ephemeral_configuration, Configuration}; + use crate::protocol::clock::DurationSinceUnixEpoch; + use crate::tracker::peer::{self, Peer}; + use crate::tracker::statistics::Keeper; + use crate::tracker::{TorrentsMetrics, Tracker}; + + pub fn tracker_configuration() -> Arc { + Arc::new(ephemeral_configuration()) + } - pub fn tracker_factory() -> Tracker { - // code-review: the tracker initialization is duplicated in many places. Consider make this function public. + pub fn tracker_factory() -> Tracker { + // code-review: the tracker initialization is duplicated in many places. Consider make this function public. - // Configuration - let configuration = tracker_configuration(); + // Configuration + let configuration = tracker_configuration(); - // Initialize stats tracker - let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); + // Initialize stats tracker + let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); - // Initialize Torrust tracker - match Tracker::new(&configuration, Some(stats_event_sender), stats_repository) { - Ok(tracker) => tracker, - Err(error) => { - panic!("{}", error) + // Initialize Torrust tracker + match Tracker::new(&configuration, Some(stats_event_sender), stats_repository) { + Ok(tracker) => tracker, + Err(error) => { + panic!("{}", error) + } } } - } - #[tokio::test] - async fn the_tracker_should_collect_torrent_metrics() { - let tracker = tracker_factory(); - - let torrents_metrics = tracker.get_torrents_metrics().await; + /// A peer that has completed downloading. + fn complete_peer() -> Peer { + Peer { + peer_id: peer::Id(*b"-qB00000000000000000"), + peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080), + updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0), + uploaded: NumberOfBytes(0), + downloaded: NumberOfBytes(0), + left: NumberOfBytes(0), // No bytes left to download + event: AnnounceEvent::Completed, + } + } - assert_eq!( - torrents_metrics, - TorrentsMetrics { - seeders: 0, - completed: 0, - leechers: 0, - torrents: 0 + /// A peer that has NOT completed downloading. + fn incomplete_peer() -> Peer { + Peer { + peer_id: peer::Id(*b"-qB00000000000000000"), + peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080), + updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0), + uploaded: NumberOfBytes(0), + downloaded: NumberOfBytes(0), + left: NumberOfBytes(1000), // Still bytes to download + event: AnnounceEvent::Started, } - ); - } + } - mod the_tracker_assigning_the_ip_to_the_peer { + #[tokio::test] + async fn should_collect_torrent_metrics() { + let tracker = tracker_factory(); - use std::net::{IpAddr, Ipv4Addr}; + let torrents_metrics = tracker.get_torrents_metrics().await; - use crate::tracker::assign_ip_address_to_peer; + assert_eq!( + torrents_metrics, + TorrentsMetrics { + seeders: 0, + completed: 0, + leechers: 0, + torrents: 0 + } + ); + } - #[test] - fn should_use_the_source_ip_instead_of_the_ip_in_the_announce_request() { - let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); + mod handling_an_announce_request { + mod should_assign_the_ip_to_the_peer { - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + use std::net::{IpAddr, Ipv4Addr}; - assert_eq!(peer_ip, remote_ip); - } + use crate::tracker::assign_ip_address_to_peer; - mod when_the_client_ip_is_a_ipv4_loopback_ip { + #[test] + fn using_the_source_ip_instead_of_the_ip_in_the_announce_request() { + let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - use std::str::FromStr; + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); - use crate::tracker::assign_ip_address_to_peer; + assert_eq!(peer_ip, remote_ip); + } - #[test] - fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() { - let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + mod and_when_the_client_ip_is_a_ipv4_loopback_ip { - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; - assert_eq!(peer_ip, remote_ip); - } + use crate::tracker::assign_ip_address_to_peer; - #[test] - fn it_should_use_the_external_tracker_ip_in_tracker_configuration_if_it_is_defined() { - let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + #[test] + fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() { + let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); - let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + assert_eq!(peer_ip, remote_ip); + } - assert_eq!(peer_ip, tracker_external_ip); - } + #[test] + fn it_should_use_the_external_tracker_ip_in_tracker_configuration_if_it_is_defined() { + let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + + let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + + #[test] + fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv6_ip( + ) { + let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + + let tracker_external_ip = + IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + } + + mod and_when_client_ip_is_a_ipv6_loopback_ip { - #[test] - fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv6_ip() - { - let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; - let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); + use crate::tracker::assign_ip_address_to_peer; - let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + #[test] + fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() { + let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); - assert_eq!(peer_ip, tracker_external_ip); + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + + assert_eq!(peer_ip, remote_ip); + } + + #[test] + fn it_should_use_the_external_ip_in_tracker_configuration_if_it_is_defined() { + let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + + let tracker_external_ip = + IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + + #[test] + fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv4_ip( + ) { + let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + + let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + } } } - mod when_client_ip_is_a_ipv6_loopback_ip { + mod handling_a_scrape_request { - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - use std::str::FromStr; + use std::net::{IpAddr, Ipv4Addr}; - use crate::tracker::assign_ip_address_to_peer; + use crate::protocol::info_hash::InfoHash; + use crate::tracker::tests::the_tracker::{complete_peer, incomplete_peer, tracker_factory}; + use crate::tracker::{ScrapeData, SwarmMetadata}; - #[test] - fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() { - let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + #[tokio::test] + async fn it_should_return_a_zeroed_swarm_metadata_for_the_requested_file_if_the_tracker_does_not_have_that_torrent() { + let tracker = tracker_factory(); - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + let info_hashes = vec!["3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::().unwrap()]; - assert_eq!(peer_ip, remote_ip); - } + let scrape_data = tracker.scrape(&info_hashes).await; - #[test] - fn it_should_use_the_external_ip_in_tracker_configuration_if_it_is_defined() { - let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + let mut expected_scrape_data = ScrapeData::empty(); - let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); + expected_scrape_data.add_file_with_no_metadata(&info_hashes[0]); - let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + assert_eq!(scrape_data, expected_scrape_data); + } - assert_eq!(peer_ip, tracker_external_ip); + #[tokio::test] + async fn it_should_return_the_swarm_metadata_for_the_requested_file_if_the_tracker_has_that_torrent() { + let tracker = tracker_factory(); + + let info_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::().unwrap(); + + // Announce a "complete" peer for the torrent + let mut complete_peer = complete_peer(); + tracker + .announce(&info_hash, &mut complete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 10))) + .await; + + // Announce an "incomplete" peer for the torrent + let mut incomplete_peer = incomplete_peer(); + tracker + .announce(&info_hash, &mut incomplete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 11))) + .await; + + // Scrape + let scrape_data = tracker.scrape(&vec![info_hash]).await; + + // The expected swarm metadata for the file + let mut expected_scrape_data = ScrapeData::empty(); + expected_scrape_data.add_file( + &info_hash, + SwarmMetadata { + complete: 0, // the "complete" peer does not count because it was not previously known + downloaded: 0, + incomplete: 1, // the "incomplete" peer we have just announced + }, + ); + + assert_eq!(scrape_data, expected_scrape_data); } - #[test] - fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv4_ip() - { - let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + #[tokio::test] + async fn it_should_allow_scraping_for_multiple_torrents() { + let tracker = tracker_factory(); + + let info_hashes = vec![ + "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::().unwrap(), + "99c82bb73505a3c0b453f9fa0e881d6e5a32a0c1".parse::().unwrap(), + ]; - let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); + let scrape_data = tracker.scrape(&info_hashes).await; - let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + let mut expected_scrape_data = ScrapeData::empty(); + expected_scrape_data.add_file_with_no_metadata(&info_hashes[0]); + expected_scrape_data.add_file_with_no_metadata(&info_hashes[1]); - assert_eq!(peer_ip, tracker_external_ip); + assert_eq!(scrape_data, expected_scrape_data); } } } diff --git a/src/tracker/torrent.rs b/src/tracker/torrent.rs index 3161cd36b..34017599d 100644 --- a/src/tracker/torrent.rs +++ b/src/tracker/torrent.rs @@ -14,6 +14,13 @@ pub struct Entry { pub completed: u32, } +#[derive(Debug, PartialEq, Default)] +pub struct SwarmMetadata { + pub complete: u32, // The number of active peers that have completed downloading + pub downloaded: u32, // The number of peers that have ever completed downloading + pub incomplete: u32, // The number of active peers that have not completed downloading +} + impl Entry { #[must_use] pub fn new() -> Entry { @@ -74,6 +81,17 @@ impl Entry { (seeders, self.completed, leechers) } + #[must_use] + pub fn get_swarm_metadata(&self) -> SwarmMetadata { + // code-review: consider using always this function instead of `get_stats`. + let (seeders, completed, leechers) = self.get_stats(); + SwarmMetadata { + complete: seeders, + downloaded: completed, + incomplete: leechers, + } + } + pub fn remove_inactive_peers(&mut self, max_peer_timeout: u32) { let current_cutoff = Current::sub(&Duration::from_secs(u64::from(max_peer_timeout))).unwrap_or_default(); self.peers.retain(|_, peer| peer.updated > current_cutoff); From ae1a076c57bc74fbd73dc42e54df373513c642d3 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 24 Feb 2023 17:23:47 +0000 Subject: [PATCH 7/9] feat(http): [#191] add scrape app service --- .../axum_implementation/extractors/peer_ip.rs | 2 +- .../axum_implementation/handlers/announce.rs | 4 ++-- .../axum_implementation/handlers/scrape.rs | 21 +++++++++++++------ .../axum_implementation/services/announce.rs | 4 ++-- src/http/axum_implementation/services/mod.rs | 1 + .../axum_implementation/services/scrape.rs | 20 ++++++++++++++++++ src/tracker/statistics.rs | 2 ++ tests/http_tracker.rs | 6 ++---- 8 files changed, 45 insertions(+), 15 deletions(-) create mode 100644 src/http/axum_implementation/services/scrape.rs diff --git a/src/http/axum_implementation/extractors/peer_ip.rs b/src/http/axum_implementation/extractors/peer_ip.rs index 9f7e92a9b..aae348d99 100644 --- a/src/http/axum_implementation/extractors/peer_ip.rs +++ b/src/http/axum_implementation/extractors/peer_ip.rs @@ -31,7 +31,7 @@ impl From for responses::error::Error { /// /// Will return an error if the peer IP cannot be obtained according to the configuration. /// For example, if the IP is extracted from an HTTP header which is missing in the request. -pub fn assign_ip_address_to_peer(on_reverse_proxy: bool, remote_client_ip: &RemoteClientIp) -> Result { +pub fn resolve(on_reverse_proxy: bool, remote_client_ip: &RemoteClientIp) -> Result { if on_reverse_proxy { if let Some(ip) = remote_client_ip.right_most_x_forwarded_for { Ok(ip) diff --git a/src/http/axum_implementation/handlers/announce.rs b/src/http/axum_implementation/handlers/announce.rs index 81f57e810..d5fa7f3a4 100644 --- a/src/http/axum_implementation/handlers/announce.rs +++ b/src/http/axum_implementation/handlers/announce.rs @@ -7,7 +7,7 @@ use axum::response::{IntoResponse, Response}; use log::debug; use crate::http::axum_implementation::extractors::announce_request::ExtractRequest; -use crate::http::axum_implementation::extractors::peer_ip::assign_ip_address_to_peer; +use crate::http::axum_implementation::extractors::peer_ip; use crate::http::axum_implementation::extractors::remote_client_ip::RemoteClientIp; use crate::http::axum_implementation::requests::announce::{Announce, Compact, Event}; use crate::http::axum_implementation::responses::announce; @@ -24,7 +24,7 @@ pub async fn handle( ) -> Response { debug!("http announce request: {:#?}", announce_request); - let peer_ip = match assign_ip_address_to_peer(tracker.config.on_reverse_proxy, &remote_client_ip) { + let peer_ip = match peer_ip::resolve(tracker.config.on_reverse_proxy, &remote_client_ip) { Ok(peer_ip) => peer_ip, Err(err) => return err, }; diff --git a/src/http/axum_implementation/handlers/scrape.rs b/src/http/axum_implementation/handlers/scrape.rs index 2246ea7db..1f1d3ece9 100644 --- a/src/http/axum_implementation/handlers/scrape.rs +++ b/src/http/axum_implementation/handlers/scrape.rs @@ -1,29 +1,38 @@ use std::sync::Arc; use axum::extract::State; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; use log::debug; +use crate::http::axum_implementation::extractors::peer_ip; use crate::http::axum_implementation::extractors::remote_client_ip::RemoteClientIp; use crate::http::axum_implementation::extractors::scrape_request::ExtractRequest; +use crate::http::axum_implementation::services; use crate::tracker::Tracker; #[allow(clippy::unused_async)] pub async fn handle( State(tracker): State>, ExtractRequest(scrape_request): ExtractRequest, - _remote_client_ip: RemoteClientIp, -) -> String { + remote_client_ip: RemoteClientIp, +) -> Response { debug!("http scrape request: {:#?}", &scrape_request); /* todo: - - Add the service that sends the event for statistics. - - Build the HTTP bencoded response. + - [x] Add the service that sends the event for statistics. + - [ ] Build the HTTP bencoded response. */ - let scrape_data = tracker.scrape(&scrape_request.info_hashes).await; + let peer_ip = match peer_ip::resolve(tracker.config.on_reverse_proxy, &remote_client_ip) { + Ok(peer_ip) => peer_ip, + Err(err) => return err, + }; + + let scrape_data = services::scrape::invoke(tracker.clone(), &scrape_request.info_hashes, &peer_ip).await; debug!("scrape data: {:#?}", &scrape_data); - "todo".to_string() + (StatusCode::OK, "todo").into_response() } diff --git a/src/http/axum_implementation/services/announce.rs b/src/http/axum_implementation/services/announce.rs index 6378c3008..356dbaeb9 100644 --- a/src/http/axum_implementation/services/announce.rs +++ b/src/http/axum_implementation/services/announce.rs @@ -9,7 +9,7 @@ pub async fn invoke(tracker: Arc, info_hash: InfoHash, peer: &mut Peer) let original_peer_ip = peer.peer_addr.ip(); // The tracker could change the original peer ip - let response = tracker.announce(&info_hash, peer, &original_peer_ip).await; + let announce_data = tracker.announce(&info_hash, peer, &original_peer_ip).await; match original_peer_ip { IpAddr::V4(_) => { @@ -20,5 +20,5 @@ pub async fn invoke(tracker: Arc, info_hash: InfoHash, peer: &mut Peer) } } - response + announce_data } diff --git a/src/http/axum_implementation/services/mod.rs b/src/http/axum_implementation/services/mod.rs index 74894de33..776d2dfbf 100644 --- a/src/http/axum_implementation/services/mod.rs +++ b/src/http/axum_implementation/services/mod.rs @@ -1 +1,2 @@ pub mod announce; +pub mod scrape; diff --git a/src/http/axum_implementation/services/scrape.rs b/src/http/axum_implementation/services/scrape.rs new file mode 100644 index 000000000..f40b8f999 --- /dev/null +++ b/src/http/axum_implementation/services/scrape.rs @@ -0,0 +1,20 @@ +use std::net::IpAddr; +use std::sync::Arc; + +use crate::protocol::info_hash::InfoHash; +use crate::tracker::{statistics, ScrapeData, Tracker}; + +pub async fn invoke(tracker: Arc, info_hashes: &Vec, original_peer_ip: &IpAddr) -> ScrapeData { + let scrape_data = tracker.scrape(info_hashes).await; + + match original_peer_ip { + IpAddr::V4(_) => { + tracker.send_stats_event(statistics::Event::Tcp4Scrape).await; + } + IpAddr::V6(_) => { + tracker.send_stats_event(statistics::Event::Tcp6Scrape).await; + } + } + + scrape_data +} diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index f9f6253fd..f9079962c 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -11,6 +11,8 @@ const CHANNEL_BUFFER_SIZE: usize = 65_535; #[derive(Debug, PartialEq, Eq)] pub enum Event { + // code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 } + // Attributes are enums too. Tcp4Announce, Tcp4Scrape, Tcp6Announce, diff --git a/tests/http_tracker.rs b/tests/http_tracker.rs index a09802724..d324e560b 100644 --- a/tests/http_tracker.rs +++ b/tests/http_tracker.rs @@ -2354,8 +2354,7 @@ mod axum_http_tracker_server { assert_scrape_response(response, &expected_scrape_response).await; } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_increase_the_number_ot_tcp4_scrape_requests_handled_in_statistics() { let http_tracker = start_public_http_tracker(Version::Axum).await; @@ -2374,8 +2373,7 @@ mod axum_http_tracker_server { assert_eq!(stats.tcp4_scrapes_handled, 1); } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_increase_the_number_ot_tcp6_scrape_requests_handled_in_statistics() { let http_tracker = start_ipv6_http_tracker(Version::Axum).await; From 86ce93cb9e0d5113bccaeba9c16abe6ffdeafcad Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 27 Feb 2023 13:07:00 +0000 Subject: [PATCH 8/9] feat(http): [#192] scrape request for Axum HTTP tracker --- .../axum_implementation/handlers/scrape.rs | 13 +-- .../axum_implementation/requests/scrape.rs | 28 ++--- src/http/axum_implementation/responses/mod.rs | 1 + .../axum_implementation/responses/scrape.rs | 106 ++++++++++++++++++ src/tracker/mod.rs | 2 +- tests/http/asserts.rs | 46 +++++--- tests/http_tracker.rs | 30 +++-- 7 files changed, 162 insertions(+), 64 deletions(-) create mode 100644 src/http/axum_implementation/responses/scrape.rs diff --git a/src/http/axum_implementation/handlers/scrape.rs b/src/http/axum_implementation/handlers/scrape.rs index 1f1d3ece9..51b6fa84d 100644 --- a/src/http/axum_implementation/handlers/scrape.rs +++ b/src/http/axum_implementation/handlers/scrape.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use axum::extract::State; -use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use log::debug; use crate::http::axum_implementation::extractors::peer_ip; use crate::http::axum_implementation::extractors::remote_client_ip::RemoteClientIp; use crate::http::axum_implementation::extractors::scrape_request::ExtractRequest; -use crate::http::axum_implementation::services; +use crate::http::axum_implementation::{responses, services}; use crate::tracker::Tracker; #[allow(clippy::unused_async)] @@ -19,12 +18,6 @@ pub async fn handle( ) -> Response { debug!("http scrape request: {:#?}", &scrape_request); - /* - todo: - - [x] Add the service that sends the event for statistics. - - [ ] Build the HTTP bencoded response. - */ - let peer_ip = match peer_ip::resolve(tracker.config.on_reverse_proxy, &remote_client_ip) { Ok(peer_ip) => peer_ip, Err(err) => return err, @@ -32,7 +25,5 @@ pub async fn handle( let scrape_data = services::scrape::invoke(tracker.clone(), &scrape_request.info_hashes, &peer_ip).await; - debug!("scrape data: {:#?}", &scrape_data); - - (StatusCode::OK, "todo").into_response() + responses::scrape::Bencoded::from(scrape_data).into_response() } diff --git a/src/http/axum_implementation/requests/scrape.rs b/src/http/axum_implementation/requests/scrape.rs index 0f23039bb..da50d4be5 100644 --- a/src/http/axum_implementation/requests/scrape.rs +++ b/src/http/axum_implementation/requests/scrape.rs @@ -10,8 +10,8 @@ use crate::protocol::info_hash::{ConversionError, InfoHash}; pub type NumberOfBytes = i64; -// Query param name -const INFO_HASH_SCRAPE_PARAM: &str = "info_hash"; +// Query param names +const INFO_HASH: &str = "info_hash"; #[derive(Debug, PartialEq)] pub struct Scrape { @@ -27,12 +27,6 @@ pub enum ParseScrapeQueryError { location: &'static Location<'static>, param_name: String, }, - #[error("invalid param value {param_value} for {param_name} in {location}")] - InvalidParam { - param_name: String, - param_value: String, - location: &'static Location<'static>, - }, #[error("invalid param value {param_value} for {param_name} in {source}")] InvalidInfoHashParam { param_name: String, @@ -60,14 +54,14 @@ impl TryFrom for Scrape { } fn extract_info_hashes(query: &Query) -> Result, ParseScrapeQueryError> { - match query.get_param_vec(INFO_HASH_SCRAPE_PARAM) { + match query.get_param_vec(INFO_HASH) { Some(raw_params) => { let mut info_hashes = vec![]; for raw_param in raw_params { let info_hash = percent_decode_info_hash(&raw_param).map_err(|err| ParseScrapeQueryError::InvalidInfoHashParam { - param_name: INFO_HASH_SCRAPE_PARAM.to_owned(), + param_name: INFO_HASH.to_owned(), param_value: raw_param.clone(), source: Located(err).into(), })?; @@ -80,7 +74,7 @@ fn extract_info_hashes(query: &Query) -> Result, ParseScrapeQueryE None => { return Err(ParseScrapeQueryError::MissingParam { location: Location::caller(), - param_name: INFO_HASH_SCRAPE_PARAM.to_owned(), + param_name: INFO_HASH.to_owned(), }) } } @@ -92,16 +86,12 @@ mod tests { mod scrape_request { use crate::http::axum_implementation::query::Query; - use crate::http::axum_implementation::requests::scrape::{Scrape, INFO_HASH_SCRAPE_PARAM}; + use crate::http::axum_implementation::requests::scrape::{Scrape, INFO_HASH}; use crate::protocol::info_hash::InfoHash; #[test] fn should_be_instantiated_from_the_url_query_with_only_one_infohash() { - let raw_query = Query::from(vec![( - INFO_HASH_SCRAPE_PARAM, - "%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0", - )]) - .to_string(); + let raw_query = Query::from(vec![(INFO_HASH, "%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0")]).to_string(); let query = raw_query.parse::().unwrap(); @@ -118,7 +108,7 @@ mod tests { mod when_it_is_instantiated_from_the_url_query_params { use crate::http::axum_implementation::query::Query; - use crate::http::axum_implementation::requests::scrape::{Scrape, INFO_HASH_SCRAPE_PARAM}; + use crate::http::axum_implementation::requests::scrape::{Scrape, INFO_HASH}; #[test] fn it_should_fail_if_the_query_does_not_include_the_info_hash_param() { @@ -129,7 +119,7 @@ mod tests { #[test] fn it_should_fail_if_the_info_hash_param_is_invalid() { - let raw_query = Query::from(vec![(INFO_HASH_SCRAPE_PARAM, "INVALID_INFO_HASH_VALUE")]).to_string(); + let raw_query = Query::from(vec![(INFO_HASH, "INVALID_INFO_HASH_VALUE")]).to_string(); assert!(Scrape::try_from(raw_query.parse::().unwrap()).is_err()); } diff --git a/src/http/axum_implementation/responses/mod.rs b/src/http/axum_implementation/responses/mod.rs index ad7d0a78c..7e8666934 100644 --- a/src/http/axum_implementation/responses/mod.rs +++ b/src/http/axum_implementation/responses/mod.rs @@ -1,3 +1,4 @@ pub mod announce; pub mod error; pub mod ok; +pub mod scrape; diff --git a/src/http/axum_implementation/responses/scrape.rs b/src/http/axum_implementation/responses/scrape.rs new file mode 100644 index 000000000..3fc34a0e5 --- /dev/null +++ b/src/http/axum_implementation/responses/scrape.rs @@ -0,0 +1,106 @@ +use std::borrow::Cow; + +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use bip_bencode::{ben_int, ben_map, BMutAccess}; + +use crate::tracker::ScrapeData; + +#[derive(Debug, PartialEq, Default)] +pub struct Bencoded { + scrape_data: ScrapeData, +} + +impl Bencoded { + /// # Panics + /// + /// Will return an error if it can't access the bencode as a mutable `BDictAccess`. + #[must_use] + pub fn body(&self) -> Vec { + let mut scrape_list = ben_map!(); + + let scrape_list_mut = scrape_list.dict_mut().unwrap(); + + for (info_hash, value) in &self.scrape_data.files { + scrape_list_mut.insert( + Cow::from(info_hash.bytes().to_vec()), + ben_map! { + "complete" => ben_int!(i64::from(value.complete)), + "downloaded" => ben_int!(i64::from(value.downloaded)), + "incomplete" => ben_int!(i64::from(value.incomplete)) + }, + ); + } + + (ben_map! { + "files" => scrape_list + }) + .encode() + } +} + +impl From for Bencoded { + fn from(scrape_data: ScrapeData) -> Self { + Self { scrape_data } + } +} + +impl IntoResponse for Bencoded { + fn into_response(self) -> Response { + (StatusCode::OK, self.body()).into_response() + } +} + +#[cfg(test)] +mod tests { + + mod scrape_response { + use crate::http::axum_implementation::responses::scrape::Bencoded; + use crate::protocol::info_hash::InfoHash; + use crate::tracker::torrent::SwarmMetadata; + use crate::tracker::ScrapeData; + + fn sample_scrape_data() -> ScrapeData { + let info_hash = InfoHash([0x69; 20]); + let mut scrape_data = ScrapeData::empty(); + scrape_data.add_file( + &info_hash, + SwarmMetadata { + complete: 1, + downloaded: 2, + incomplete: 3, + }, + ); + scrape_data + } + + #[test] + fn should_be_converted_from_scrape_data() { + let response = Bencoded::from(sample_scrape_data()); + + assert_eq!( + response, + Bencoded { + scrape_data: sample_scrape_data() + } + ); + } + + #[test] + fn should_be_bencoded() { + let response = Bencoded { + scrape_data: sample_scrape_data(), + }; + + let bytes = response.body(); + + // cspell:disable-next-line + let expected_bytes = b"d5:filesd20:iiiiiiiiiiiiiiiiiiiid8:completei1e10:downloadedi2e10:incompletei3eeee"; + + assert_eq!( + String::from_utf8(bytes).unwrap(), + String::from_utf8(expected_bytes.to_vec()).unwrap() + ); + } + } +} diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index 0a3bd7c0b..3e5e97439 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -52,7 +52,7 @@ pub struct AnnounceData { #[derive(Debug, PartialEq, Default)] pub struct ScrapeData { - files: HashMap, + pub files: HashMap, } impl ScrapeData { diff --git a/tests/http/asserts.rs b/tests/http/asserts.rs index a10edc9e6..cd45571da 100644 --- a/tests/http/asserts.rs +++ b/tests/http/asserts.rs @@ -78,6 +78,36 @@ pub async fn assert_is_announce_response(response: Response) { // Error responses +// Specific errors for announce request + +pub async fn assert_missing_query_params_for_announce_request_error_response(response: Response) { + assert_eq!(response.status(), 200); + + assert_bencoded_error( + &response.text().await.unwrap(), + "missing query params for announce request", + Location::caller(), + ); +} + +pub async fn assert_bad_announce_request_error_response(response: Response, failure: &str) { + assert_cannot_parse_query_params_error_response(response, &format!(" for announce request: {failure}")).await; +} + +// Specific errors for scrape request + +pub async fn assert_missing_query_params_for_scrape_request_error_response(response: Response) { + assert_eq!(response.status(), 200); + + assert_bencoded_error( + &response.text().await.unwrap(), + "missing query params for scrape request", + Location::caller(), + ); +} + +// Other errors + pub async fn assert_internal_server_error_response(response: Response) { assert_eq!(response.status(), 200); @@ -156,22 +186,6 @@ pub async fn assert_invalid_remote_address_on_xff_header_error_response(response ); } -// Specific errors for announce request - -pub async fn assert_missing_query_params_for_announce_request_error_response(response: Response) { - assert_eq!(response.status(), 200); - - assert_bencoded_error( - &response.text().await.unwrap(), - "missing query params for announce request", - Location::caller(), - ); -} - -pub async fn assert_bad_announce_request_error_response(response: Response, failure: &str) { - assert_cannot_parse_query_params_error_response(response, &format!(" for announce request: {failure}")).await; -} - pub async fn assert_cannot_parse_query_param_error_response(response: Response, failure: &str) { assert_cannot_parse_query_params_error_response(response, &format!(": {failure}")).await; } diff --git a/tests/http_tracker.rs b/tests/http_tracker.rs index d324e560b..a341e13ed 100644 --- a/tests/http_tracker.rs +++ b/tests/http_tracker.rs @@ -2198,24 +2198,25 @@ mod axum_http_tracker_server { use torrust_tracker::tracker::peer; use crate::common::fixtures::{invalid_info_hashes, PeerBuilder}; - use crate::http::asserts::{assert_internal_server_error_response, assert_scrape_response}; + use crate::http::asserts::{ + assert_cannot_parse_query_params_error_response, assert_missing_query_params_for_scrape_request_error_response, + assert_scrape_response, + }; use crate::http::client::Client; use crate::http::requests; use crate::http::requests::scrape::QueryBuilder; use crate::http::responses::scrape::{self, File, ResponseBuilder}; use crate::http::server::{start_ipv6_http_tracker, start_public_http_tracker}; - //#[tokio::test] - #[allow(dead_code)] - async fn should_fail_when_the_request_is_empty() { + #[tokio::test] + async fn should_fail_when_the_url_query_component_is_empty() { let http_tracker_server = start_public_http_tracker(Version::Axum).await; let response = Client::new(http_tracker_server.get_connection_info()).get("scrape").await; - assert_internal_server_error_response(response).await; + assert_missing_query_params_for_scrape_request_error_response(response).await; } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_fail_when_the_info_hash_param_is_invalid() { let http_tracker_server = start_public_http_tracker(Version::Axum).await; @@ -2228,13 +2229,11 @@ mod axum_http_tracker_server { .get(&format!("announce?{params}")) .await; - // code-review: it's not returning the invalid info hash error - assert_internal_server_error_response(response).await; + assert_cannot_parse_query_params_error_response(response, "").await; } } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_return_the_file_with_the_incomplete_peer_when_there_is_one_peer_with_bytes_pending_to_download() { let http_tracker = start_public_http_tracker(Version::Axum).await; @@ -2272,8 +2271,7 @@ mod axum_http_tracker_server { assert_scrape_response(response, &expected_scrape_response).await; } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_return_the_file_with_the_complete_peer_when_there_is_one_peer_with_no_bytes_pending_to_download() { let http_tracker = start_public_http_tracker(Version::Axum).await; @@ -2311,8 +2309,7 @@ mod axum_http_tracker_server { assert_scrape_response(response, &expected_scrape_response).await; } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_return_a_file_with_zeroed_values_when_there_are_no_peers() { let http_tracker = start_public_http_tracker(Version::Axum).await; @@ -2329,8 +2326,7 @@ mod axum_http_tracker_server { assert_scrape_response(response, &scrape::Response::with_one_file(info_hash.bytes(), File::zeroed())).await; } - //#[tokio::test] - #[allow(dead_code)] + #[tokio::test] async fn should_accept_multiple_infohashes() { let http_tracker = start_public_http_tracker(Version::Axum).await; From 4b3f9793970b9724d9757952cb09b9e0f95101fe Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 27 Feb 2023 13:46:17 +0000 Subject: [PATCH 9/9] refactor(udp): [#192] use new tracker::scrape method in UDP tracker --- src/tracker/torrent.rs | 4 ++-- src/udp/handlers.rs | 48 ++++++++++++++++++------------------------ 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/src/tracker/torrent.rs b/src/tracker/torrent.rs index 34017599d..dc41b083e 100644 --- a/src/tracker/torrent.rs +++ b/src/tracker/torrent.rs @@ -16,9 +16,9 @@ pub struct Entry { #[derive(Debug, PartialEq, Default)] pub struct SwarmMetadata { - pub complete: u32, // The number of active peers that have completed downloading + pub complete: u32, // The number of active peers that have completed downloading (seeders) pub downloaded: u32, // The number of peers that have ever completed downloading - pub incomplete: u32, // The number of active peers that have not completed downloading + pub incomplete: u32, // The number of active peers that have not completed downloading (leechers) } impl Entry { diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 8978beb70..6c54a6106 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -182,51 +182,43 @@ pub async fn handle_announce( /// # Errors /// /// This function dose not ever return an error. -/// -/// TODO: refactor this, db lock can be a lot shorter pub async fn handle_scrape( remote_addr: SocketAddr, request: &ScrapeRequest, tracker: Arc, ) -> Result { - let db = tracker.get_torrents().await; + // Convert from aquatic infohashes + let mut info_hashes = vec![]; + for info_hash in &request.info_hashes { + info_hashes.push(InfoHash(info_hash.0)); + } + + let scrape_data = tracker.scrape(&info_hashes).await; let mut torrent_stats: Vec = Vec::new(); - for info_hash in &request.info_hashes { - let info_hash = InfoHash(info_hash.0); - - let scrape_entry = match db.get(&info_hash) { - Some(torrent_info) => { - if tracker.authenticate_request(&info_hash, &None).await.is_ok() { - let (seeders, completed, leechers) = torrent_info.get_stats(); - - #[allow(clippy::cast_possible_truncation)] - TorrentScrapeStatistics { - seeders: NumberOfPeers(i64::from(seeders) as i32), - completed: NumberOfDownloads(i64::from(completed) as i32), - leechers: NumberOfPeers(i64::from(leechers) as i32), - } - } else { - TorrentScrapeStatistics { - seeders: NumberOfPeers(0), - completed: NumberOfDownloads(0), - leechers: NumberOfPeers(0), - } - } + for file in &scrape_data.files { + let info_hash = file.0; + let swarm_metadata = file.1; + + let scrape_entry = if tracker.authenticate_request(info_hash, &None).await.is_ok() { + #[allow(clippy::cast_possible_truncation)] + TorrentScrapeStatistics { + seeders: NumberOfPeers(i64::from(swarm_metadata.complete) as i32), + completed: NumberOfDownloads(i64::from(swarm_metadata.downloaded) as i32), + leechers: NumberOfPeers(i64::from(swarm_metadata.incomplete) as i32), } - None => TorrentScrapeStatistics { + } else { + TorrentScrapeStatistics { seeders: NumberOfPeers(0), completed: NumberOfDownloads(0), leechers: NumberOfPeers(0), - }, + } }; torrent_stats.push(scrape_entry); } - drop(db); - // send stats event match remote_addr { SocketAddr::V4(_) => {