Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions src/http/warp_implementation/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use warp::{reject, Rejection, Reply};

use super::error::Error;
use super::{request, response, WebResult};
use crate::http::warp_implementation::peer_builder;
use crate::protocol::info_hash::InfoHash;
use crate::tracker::{self, auth, peer, statistics, torrent};

Expand All @@ -31,32 +32,26 @@ pub async fn authenticate(
})
}

/// Handle announce request
///
/// # Errors
///
/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_scrape_response`.
/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_announce_response`.
pub async fn handle_announce(
announce_request: request::Announce,
auth_key: Option<auth::Key>,
tracker: Arc<tracker::Tracker>,
) -> WebResult<impl Reply> {
authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await?;
debug!("http announce request: {:#?}", announce_request);

debug!("{:?}", announce_request);
let info_hash = announce_request.info_hash;
let remote_client_ip = announce_request.peer_addr;

let peer = peer::Peer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip());
let torrent_stats = tracker
.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer)
.await;
authenticate(&info_hash, &auth_key, tracker.clone()).await?;

// get all torrent peers excluding the peer_addr
let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await;
let mut peer = peer_builder::from_request(&announce_request, &remote_client_ip);

let announce_interval = tracker.config.announce_interval;
let response = tracker.announce(&info_hash, &mut peer, &remote_client_ip).await;

// send stats event
match announce_request.peer_addr {
match remote_client_ip {
IpAddr::V4(_) => {
tracker.send_stats_event(statistics::Event::Tcp4Announce).await;
}
Expand All @@ -67,15 +62,13 @@ pub async fn handle_announce(

send_announce_response(
&announce_request,
&torrent_stats,
&peers,
announce_interval,
&response.swam_stats,
&response.peers,
tracker.config.announce_interval,
tracker.config.min_announce_interval,
)
}

/// Handle scrape request
///
/// # Errors
///
/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_scrape_response`.
Expand Down
1 change: 1 addition & 0 deletions src/http/warp_implementation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use warp::Rejection;
pub mod error;
pub mod filters;
pub mod handlers;
pub mod peer_builder;
pub mod request;
pub mod response;
pub mod routes;
Expand Down
32 changes: 32 additions & 0 deletions src/http/warp_implementation/peer_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::net::{IpAddr, SocketAddr};

use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes};

use super::request::Announce;
use crate::protocol::clock::{Current, Time};
use crate::tracker::peer::Peer;

#[must_use]
pub fn from_request(announce_request: &Announce, peer_ip: &IpAddr) -> Peer {
let event: AnnounceEvent = if let Some(event) = &announce_request.event {
match event.as_ref() {
"started" => AnnounceEvent::Started,
"stopped" => AnnounceEvent::Stopped,
"completed" => AnnounceEvent::Completed,
_ => AnnounceEvent::None,
}
} else {
AnnounceEvent::None
};

#[allow(clippy::cast_possible_truncation)]
Peer {
peer_id: announce_request.peer_id,
peer_addr: SocketAddr::new(*peer_ip, announce_request.port),
updated: Current::now(),
uploaded: NumberOfBytes(i128::from(announce_request.uploaded) as i64),
downloaded: NumberOfBytes(i128::from(announce_request.downloaded) as i64),
left: NumberOfBytes(i128::from(announce_request.left) as i64),
event,
}
}
133 changes: 131 additions & 2 deletions src/tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod torrent;

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::panic::Location;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -17,6 +17,8 @@ use tokio::sync::mpsc::error::SendError;
use tokio::sync::{RwLock, RwLockReadGuard};

use self::error::Error;
use self::peer::Peer;
use self::torrent::SwamStats;
use crate::config::Configuration;
use crate::databases::driver::Driver;
use crate::databases::{self, Database};
Expand All @@ -41,6 +43,11 @@ pub struct TorrentsMetrics {
pub torrents: u64,
}

pub struct AnnounceResponse {
pub peers: Vec<Peer>,
pub swam_stats: SwamStats,
}

impl Tracker {
/// # Errors
///
Expand Down Expand Up @@ -76,6 +83,23 @@ impl Tracker {
self.mode == mode::Mode::Listed || self.mode == mode::Mode::PrivateListed
}

/// It handles an announce request
pub async fn announce(&self, info_hash: &InfoHash, peer: &mut Peer, remote_client_ip: &IpAddr) -> AnnounceResponse {
peer.change_ip(&self.assign_ip_address_to_peer(remote_client_ip));

let swam_stats = self.update_torrent_with_peer_and_get_stats(info_hash, peer).await;

let peers = self.get_other_peers(info_hash, &peer.peer_addr).await;

AnnounceResponse { peers, swam_stats }
}

/// It assigns an IP address to the peer
#[must_use]
pub fn assign_ip_address_to_peer(&self, remote_client_ip: &IpAddr) -> IpAddr {
assign_ip_address_to_peer(remote_client_ip, self.config.get_ext_ip())
}

/// # Errors
///
/// Will return a `database::Error` if unable to add the `auth_key` to the database.
Expand Down Expand Up @@ -273,7 +297,7 @@ impl Tracker {
}

/// Get all torrent peers for a given torrent filtering out the peer with the client address
pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec<peer::Peer> {
pub async fn get_other_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec<peer::Peer> {
let read_lock = self.torrents.read().await;

match read_lock.get(info_hash) {
Expand Down Expand Up @@ -378,6 +402,15 @@ impl Tracker {
}
}

#[must_use]
pub fn assign_ip_address_to_peer(remote_client_ip: &IpAddr, tracker_external_ip: Option<IpAddr>) -> IpAddr {
if let Some(host_ip) = tracker_external_ip.filter(|_| remote_client_ip.is_loopback()) {
host_ip
} else {
*remote_client_ip
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -424,4 +457,100 @@ mod tests {
}
);
}

mod the_tracker_assigning_the_ip_to_the_peer {

use std::net::{IpAddr, Ipv4Addr};

use crate::tracker::assign_ip_address_to_peer;

#[test]
fn should_use_the_source_ip_instead_of_the_ip_in_the_announce_request() {
let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2));

let peer_ip = assign_ip_address_to_peer(&remote_ip, None);

assert_eq!(peer_ip, remote_ip);
}

mod when_the_client_ip_is_a_ipv4_loopback_ip {

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use crate::tracker::assign_ip_address_to_peer;

#[test]
fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() {
let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);

let peer_ip = assign_ip_address_to_peer(&remote_ip, None);

assert_eq!(peer_ip, remote_ip);
}

#[test]
fn it_should_use_the_external_tracker_ip_in_tracker_configuration_if_it_is_defined() {
let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}

#[test]
fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv6_ip()
{
let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}
}

mod when_client_ip_is_a_ipv6_loopback_ip {

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use crate::tracker::assign_ip_address_to_peer;

#[test]
fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() {
let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST);

let peer_ip = assign_ip_address_to_peer(&remote_ip, None);

assert_eq!(peer_ip, remote_ip);
}

#[test]
fn it_should_use_the_external_ip_in_tracker_configuration_if_it_is_defined() {
let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}

#[test]
fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv4_ip()
{
let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST);

let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap());

let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip));

assert_eq!(peer_ip, tracker_external_ip);
}
}
}
}
Loading