Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/shadowsocks-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ struct SSConfig {

#[serde(skip_serializing_if = "Option::is_none")]
no_delay: Option<bool>,

#[serde(skip_serializing_if = "Option::is_none")]
keep_alive: Option<u64>,

Expand All @@ -186,6 +187,9 @@ struct SSConfig {
#[serde(skip_serializing_if = "Option::is_none")]
fast_open: Option<bool>,

#[serde(skip_serializing_if = "Option::is_none")]
mptcp: Option<bool>,

#[serde(skip_serializing_if = "Option::is_none")]
#[cfg(any(target_os = "linux", target_os = "android"))]
outbound_fwmark: Option<u32>,
Expand Down Expand Up @@ -1123,6 +1127,8 @@ pub struct Config {
///
/// If this is not set, sockets will be set with a default timeout
pub keep_alive: Option<Duration>,
/// Multipath-TCP
pub mptcp: bool,

/// `RLIMIT_NOFILE` option for *nix systems
#[cfg(all(unix, not(target_os = "android")))]
Expand Down Expand Up @@ -1260,6 +1266,7 @@ impl Config {
no_delay: false,
fast_open: false,
keep_alive: None,
mptcp: false,

#[cfg(all(unix, not(target_os = "android")))]
nofile: None,
Expand Down Expand Up @@ -1914,6 +1921,11 @@ impl Config {
nconfig.keep_alive = Some(Duration::from_secs(d));
}

// Multipath-TCP
if let Some(b) = config.mptcp {
nconfig.mptcp = b;
}

// UDP
nconfig.udp_timeout = config.udp_timeout.map(Duration::from_secs);

Expand Down Expand Up @@ -2587,6 +2599,10 @@ impl fmt::Display for Config {
jconf.keep_alive = Some(keepalive.as_secs());
}

if self.mptcp {
jconf.mptcp = Some(self.mptcp);
}

match self.dns {
DnsConfig::System => {}
#[cfg(feature = "trust-dns")]
Expand Down
2 changes: 2 additions & 0 deletions crates/shadowsocks-service/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
connect_opts.tcp.nodelay = config.no_delay;
connect_opts.tcp.fastopen = config.fast_open;
connect_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
connect_opts.tcp.mptcp = config.mptcp;
context.set_connect_opts(connect_opts);

let mut accept_opts = AcceptOpts {
Expand All @@ -158,6 +159,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
accept_opts.tcp.nodelay = config.no_delay;
accept_opts.tcp.fastopen = config.fast_open;
accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
accept_opts.tcp.mptcp = config.mptcp;
context.set_accept_opts(accept_opts);

if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, context.connect_opts_ref()).await {
Expand Down
2 changes: 2 additions & 0 deletions crates/shadowsocks-service/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn run(config: Config) -> io::Result<()> {
connect_opts.tcp.nodelay = config.no_delay;
connect_opts.tcp.fastopen = config.fast_open;
connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
connect_opts.tcp.mptcp = config.mptcp;

let mut accept_opts = AcceptOpts {
ipv6_only: config.ipv6_only,
Expand All @@ -61,6 +62,7 @@ pub async fn run(config: Config) -> io::Result<()> {
accept_opts.tcp.nodelay = config.no_delay;
accept_opts.tcp.fastopen = config.fast_open;
accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
accept_opts.tcp.mptcp = config.mptcp;

if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts).await {
manager.set_dns_resolver(Arc::new(resolver));
Expand Down
2 changes: 2 additions & 0 deletions crates/shadowsocks-service/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub async fn run(config: Config) -> io::Result<()> {
connect_opts.tcp.nodelay = config.no_delay;
connect_opts.tcp.fastopen = config.fast_open;
connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
connect_opts.tcp.mptcp = config.mptcp;

let mut accept_opts = AcceptOpts {
ipv6_only: config.ipv6_only,
Expand All @@ -88,6 +89,7 @@ pub async fn run(config: Config) -> io::Result<()> {
accept_opts.tcp.nodelay = config.no_delay;
accept_opts.tcp.fastopen = config.fast_open;
accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
accept_opts.tcp.mptcp = config.mptcp;

let resolver = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts)
.await
Expand Down
8 changes: 8 additions & 0 deletions crates/shadowsocks/src/net/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ pub struct TcpSocketOpts {
/// `SO_KEEPALIVE` and sets `TCP_KEEPIDLE`, `TCP_KEEPINTVL` and `TCP_KEEPCNT` respectively,
/// enables keep-alive messages on connection-oriented sockets
pub keepalive: Option<Duration>,

/// Enable Multipath-TCP (mptcp)
/// https://en.wikipedia.org/wiki/Multipath_TCP
///
/// Currently only supported on
/// - macOS (iOS, watchOS, ...) with Client Support only.
/// - Linux (>5.19)
pub mptcp: bool,
}

/// Options for connecting to remote server
Expand Down
9 changes: 9 additions & 0 deletions crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio_tfo::TfoStream;
use crate::net::{
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
udp::{BatchRecvMessage, BatchSendMessage},
AcceptOpts,
AddrFamily,
ConnectOpts,
};
Expand Down Expand Up @@ -170,6 +171,14 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
Ok(())
}

/// Create a TCP socket for listening
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
match bind_addr {
SocketAddr::V4(..) => TcpSocket::new_v4(),
SocketAddr::V6(..) => TcpSocket::new_v6(),
}
}

/// Disable IP fragmentation
#[inline]
pub fn set_disable_ip_fragmentation<S: AsRawFd>(af: AddrFamily, socket: &S) -> io::Result<()> {
Expand Down
83 changes: 79 additions & 4 deletions crates/shadowsocks/src/net/sys/unix/bsd/macos.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{
io::{self, ErrorKind},
mem,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
os::unix::io::{AsRawFd, RawFd},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream},
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
pin::Pin,
ptr,
sync::atomic::{AtomicBool, Ordering},
Expand All @@ -13,14 +13,15 @@ use log::{debug, error, warn};
use pin_project::pin_project;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
io::{AsyncRead, AsyncWrite, Interest, ReadBuf},
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
};
use tokio_tfo::TfoStream;

use crate::net::{
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
udp::{BatchRecvMessage, BatchSendMessage},
AcceptOpts,
AddrFamily,
ConnectOpts,
};
Expand All @@ -34,11 +35,34 @@ pub enum TcpStream {

impl TcpStream {
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
if opts.tcp.mptcp {
return TcpStream::connect_mptcp(addr, opts).await;
}

let socket = match addr {
SocketAddr::V4(..) => TcpSocket::new_v4()?,
SocketAddr::V6(..) => TcpSocket::new_v6()?,
};

TcpStream::connect_with_socket(socket, addr, opts).await
}

async fn connect_mptcp(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
// https://opensource.apple.com/source/xnu/xnu-4570.41.2/bsd/sys/socket.h.auto.html
const AF_MULTIPATH: libc::c_int = 39;

let socket = unsafe {
let fd = libc::socket(AF_MULTIPATH, libc::SOCK_STREAM, libc::IPPROTO_TCP);
let socket = Socket::from_raw_fd(fd);
socket.set_nonblocking(true)?;
TcpSocket::from_raw_fd(socket.into_raw_fd())
};

TcpStream::connect_with_socket(socket, addr, opts).await
}

#[inline]
async fn connect_with_socket(socket: TcpSocket, addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
// Binds to a specific network interface (device)
if let Some(ref iface) = opts.bind_interface {
set_ip_bound_if(&socket, addr, iface)?;
Expand All @@ -48,7 +72,50 @@ impl TcpStream {

if !opts.tcp.fastopen {
// If TFO is not enabled, it just works like a normal TcpStream
let stream = socket.connect(addr).await?;
//
// But for Multipath-TCP, we must use connectx
// http://blog.multipath-tcp.org/blog/html/2018/12/17/multipath_tcp_apis.html
let stream = if opts.tcp.mptcp {
let stream = unsafe {
let raddr = SockAddr::from(addr);

let mut endpoints: libc::sa_endpoints_t = mem::zeroed();
endpoints.sae_dstaddr = raddr.as_ptr();
endpoints.sae_dstaddrlen = raddr.len();

let ret = libc::connectx(
socket.as_raw_fd(),
&endpoints as *const _,
libc::SAE_ASSOCID_ANY,
0,
ptr::null(),
0,
ptr::null_mut(),
ptr::null_mut(),
);

if ret != 0 {
let err = io::Error::last_os_error();
if err.raw_os_error() != Some(libc::EINPROGRESS) {
return Err(err);
}
}

let fd = socket.into_raw_fd();
TokioTcpStream::from_std(StdTcpStream::from_raw_fd(fd))?
};

stream.ready(Interest::WRITABLE).await?;

if let Err(err) = stream.take_error() {
return Err(err);
}

stream
} else {
socket.connect(addr).await?
};

set_common_sockopt_after_connect(&stream, opts)?;
return Ok(TcpStream::Standard(stream));
}
Expand Down Expand Up @@ -155,6 +222,14 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
Ok(())
}

/// Create a TCP socket for listening
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
match bind_addr {
SocketAddr::V4(..) => TcpSocket::new_v4(),
SocketAddr::V6(..) => TcpSocket::new_v6(),
}
}

fn set_ip_bound_if<S: AsRawFd>(socket: &S, addr: SocketAddr, iface: &str) -> io::Result<()> {
const IP_BOUND_IF: libc::c_int = 25; // bsd/netinet/in.h
const IPV6_BOUND_IF: libc::c_int = 125; // bsd/netinet6/in6.h
Expand Down
41 changes: 40 additions & 1 deletion crates/shadowsocks/src/net/sys/unix/linux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
io,
mem,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
os::unix::io::{AsRawFd, RawFd},
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
pin::Pin,
ptr,
sync::atomic::{AtomicBool, Ordering},
Expand All @@ -22,6 +22,7 @@ use tokio_tfo::TfoStream;
use crate::net::{
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
udp::{BatchRecvMessage, BatchSendMessage},
AcceptOpts,
AddrFamily,
ConnectOpts,
};
Expand All @@ -35,11 +36,24 @@ pub enum TcpStream {

impl TcpStream {
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
if opts.tcp.mptcp {
return TcpStream::connect_mptcp(addr, opts).await;
}

let socket = match addr {
SocketAddr::V4(..) => TcpSocket::new_v4()?,
SocketAddr::V6(..) => TcpSocket::new_v6()?,
};

TcpStream::connect_with_socket(socket, addr, opts).await
}

async fn connect_mptcp(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
let socket = create_mptcp_socket(&addr)?;
TcpStream::connect_with_socket(socket, addr, opts).await
}

async fn connect_with_socket(socket: TcpSocket, addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
// Any traffic to localhost should not be protected
// This is a workaround for VPNService
#[cfg(target_os = "android")]
Expand Down Expand Up @@ -201,6 +215,31 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
Ok(())
}

fn create_mptcp_socket(bind_addr: &SocketAddr) -> io::Result<TcpSocket> {
unsafe {
let family = match bind_addr {
SocketAddr::V4(..) => libc::AF_INET,
SocketAddr::V6(..) => libc::AF_INET6,
};
let fd = libc::socket(family, libc::SOCK_STREAM, libc::IPPROTO_MPTCP);
let socket = Socket::from_raw_fd(fd);
socket.set_nonblocking(true)?;
Ok(TcpSocket::from_raw_fd(socket.into_raw_fd()))
}
}

/// Create a TCP socket for listening
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
if accept_opts.tcp.mptcp {
create_mptcp_socket(bind_addr)
} else {
match bind_addr {
SocketAddr::V4(..) => TcpSocket::new_v4(),
SocketAddr::V6(..) => TcpSocket::new_v6(),
}
}
}

/// Disable IP fragmentation
#[inline]
pub fn set_disable_ip_fragmentation<S: AsRawFd>(af: AddrFamily, socket: &S) -> io::Result<()> {
Expand Down
Loading