Skip to content

Commit e2b27cf

Browse files
committed
feat(mptcp): Basic implementation of Multipath-TCP
- Currently support macOS (Client Only) and Linux (> 5.19) - ref #1156
1 parent 115dba7 commit e2b27cf

File tree

15 files changed

+308
-169
lines changed

15 files changed

+308
-169
lines changed

crates/shadowsocks-service/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ struct SSConfig {
171171

172172
#[serde(skip_serializing_if = "Option::is_none")]
173173
no_delay: Option<bool>,
174+
174175
#[serde(skip_serializing_if = "Option::is_none")]
175176
keep_alive: Option<u64>,
176177

@@ -186,6 +187,9 @@ struct SSConfig {
186187
#[serde(skip_serializing_if = "Option::is_none")]
187188
fast_open: Option<bool>,
188189

190+
#[serde(skip_serializing_if = "Option::is_none")]
191+
mptcp: Option<bool>,
192+
189193
#[serde(skip_serializing_if = "Option::is_none")]
190194
#[cfg(any(target_os = "linux", target_os = "android"))]
191195
outbound_fwmark: Option<u32>,
@@ -1123,6 +1127,8 @@ pub struct Config {
11231127
///
11241128
/// If this is not set, sockets will be set with a default timeout
11251129
pub keep_alive: Option<Duration>,
1130+
/// Multipath-TCP
1131+
pub mptcp: bool,
11261132

11271133
/// `RLIMIT_NOFILE` option for *nix systems
11281134
#[cfg(all(unix, not(target_os = "android")))]
@@ -1260,6 +1266,7 @@ impl Config {
12601266
no_delay: false,
12611267
fast_open: false,
12621268
keep_alive: None,
1269+
mptcp: false,
12631270

12641271
#[cfg(all(unix, not(target_os = "android")))]
12651272
nofile: None,
@@ -1914,6 +1921,11 @@ impl Config {
19141921
nconfig.keep_alive = Some(Duration::from_secs(d));
19151922
}
19161923

1924+
// Multipath-TCP
1925+
if let Some(b) = config.mptcp {
1926+
nconfig.mptcp = b;
1927+
}
1928+
19171929
// UDP
19181930
nconfig.udp_timeout = config.udp_timeout.map(Duration::from_secs);
19191931

@@ -2587,6 +2599,10 @@ impl fmt::Display for Config {
25872599
jconf.keep_alive = Some(keepalive.as_secs());
25882600
}
25892601

2602+
if self.mptcp {
2603+
jconf.mptcp = Some(self.mptcp);
2604+
}
2605+
25902606
match self.dns {
25912607
DnsConfig::System => {}
25922608
#[cfg(feature = "trust-dns")]

crates/shadowsocks-service/src/local/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
147147
connect_opts.tcp.nodelay = config.no_delay;
148148
connect_opts.tcp.fastopen = config.fast_open;
149149
connect_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
150+
connect_opts.tcp.mptcp = config.mptcp;
150151
context.set_connect_opts(connect_opts);
151152

152153
let mut accept_opts = AcceptOpts {
@@ -158,6 +159,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
158159
accept_opts.tcp.nodelay = config.no_delay;
159160
accept_opts.tcp.fastopen = config.fast_open;
160161
accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
162+
accept_opts.tcp.mptcp = config.mptcp;
161163
context.set_accept_opts(accept_opts);
162164

163165
if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, context.connect_opts_ref()).await {

crates/shadowsocks-service/src/manager/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub async fn run(config: Config) -> io::Result<()> {
5151
connect_opts.tcp.nodelay = config.no_delay;
5252
connect_opts.tcp.fastopen = config.fast_open;
5353
connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
54+
connect_opts.tcp.mptcp = config.mptcp;
5455

5556
let mut accept_opts = AcceptOpts {
5657
ipv6_only: config.ipv6_only,
@@ -61,6 +62,7 @@ pub async fn run(config: Config) -> io::Result<()> {
6162
accept_opts.tcp.nodelay = config.no_delay;
6263
accept_opts.tcp.fastopen = config.fast_open;
6364
accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
65+
accept_opts.tcp.mptcp = config.mptcp;
6466

6567
if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts).await {
6668
manager.set_dns_resolver(Arc::new(resolver));

crates/shadowsocks-service/src/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub async fn run(config: Config) -> io::Result<()> {
7878
connect_opts.tcp.nodelay = config.no_delay;
7979
connect_opts.tcp.fastopen = config.fast_open;
8080
connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
81+
connect_opts.tcp.mptcp = config.mptcp;
8182

8283
let mut accept_opts = AcceptOpts {
8384
ipv6_only: config.ipv6_only,
@@ -88,6 +89,7 @@ pub async fn run(config: Config) -> io::Result<()> {
8889
accept_opts.tcp.nodelay = config.no_delay;
8990
accept_opts.tcp.fastopen = config.fast_open;
9091
accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
92+
accept_opts.tcp.mptcp = config.mptcp;
9193

9294
let resolver = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts)
9395
.await

crates/shadowsocks/src/net/option.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ pub struct TcpSocketOpts {
2020
/// `SO_KEEPALIVE` and sets `TCP_KEEPIDLE`, `TCP_KEEPINTVL` and `TCP_KEEPCNT` respectively,
2121
/// enables keep-alive messages on connection-oriented sockets
2222
pub keepalive: Option<Duration>,
23+
24+
/// Enable Multipath-TCP (mptcp)
25+
/// https://en.wikipedia.org/wiki/Multipath_TCP
26+
///
27+
/// Currently only supported on
28+
/// - macOS (iOS, watchOS, ...) with Client Support only.
29+
/// - Linux (>5.19)
30+
pub mptcp: bool,
2331
}
2432

2533
/// Options for connecting to remote server

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use tokio_tfo::TfoStream;
2121
use crate::net::{
2222
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
2323
udp::{BatchRecvMessage, BatchSendMessage},
24+
AcceptOpts,
2425
AddrFamily,
2526
ConnectOpts,
2627
};
@@ -170,6 +171,14 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
170171
Ok(())
171172
}
172173

174+
/// Create a TCP socket for listening
175+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
176+
match bind_addr {
177+
SocketAddr::V4(..) => TcpSocket::new_v4(),
178+
SocketAddr::V6(..) => TcpSocket::new_v6(),
179+
}
180+
}
181+
173182
/// Disable IP fragmentation
174183
#[inline]
175184
pub fn set_disable_ip_fragmentation<S: AsRawFd>(af: AddrFamily, socket: &S) -> io::Result<()> {

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::{
22
io::{self, ErrorKind},
33
mem,
4-
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
5-
os::unix::io::{AsRawFd, RawFd},
4+
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream},
5+
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
66
pin::Pin,
77
ptr,
88
sync::atomic::{AtomicBool, Ordering},
@@ -13,14 +13,15 @@ use log::{debug, error, warn};
1313
use pin_project::pin_project;
1414
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
1515
use tokio::{
16-
io::{AsyncRead, AsyncWrite, ReadBuf},
16+
io::{AsyncRead, AsyncWrite, Interest, ReadBuf},
1717
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
1818
};
1919
use tokio_tfo::TfoStream;
2020

2121
use crate::net::{
2222
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
2323
udp::{BatchRecvMessage, BatchSendMessage},
24+
AcceptOpts,
2425
AddrFamily,
2526
ConnectOpts,
2627
};
@@ -34,11 +35,34 @@ pub enum TcpStream {
3435

3536
impl TcpStream {
3637
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
38+
if opts.tcp.mptcp {
39+
return TcpStream::connect_mptcp(addr, opts).await;
40+
}
41+
3742
let socket = match addr {
3843
SocketAddr::V4(..) => TcpSocket::new_v4()?,
3944
SocketAddr::V6(..) => TcpSocket::new_v6()?,
4045
};
4146

47+
TcpStream::connect_with_socket(socket, addr, opts).await
48+
}
49+
50+
async fn connect_mptcp(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
51+
// https://opensource.apple.com/source/xnu/xnu-4570.41.2/bsd/sys/socket.h.auto.html
52+
const AF_MULTIPATH: libc::c_int = 39;
53+
54+
let socket = unsafe {
55+
let fd = libc::socket(AF_MULTIPATH, libc::SOCK_STREAM, libc::IPPROTO_TCP);
56+
let socket = Socket::from_raw_fd(fd);
57+
socket.set_nonblocking(true)?;
58+
TcpSocket::from_raw_fd(socket.into_raw_fd())
59+
};
60+
61+
TcpStream::connect_with_socket(socket, addr, opts).await
62+
}
63+
64+
#[inline]
65+
async fn connect_with_socket(socket: TcpSocket, addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
4266
// Binds to a specific network interface (device)
4367
if let Some(ref iface) = opts.bind_interface {
4468
set_ip_bound_if(&socket, addr, iface)?;
@@ -48,7 +72,50 @@ impl TcpStream {
4872

4973
if !opts.tcp.fastopen {
5074
// If TFO is not enabled, it just works like a normal TcpStream
51-
let stream = socket.connect(addr).await?;
75+
//
76+
// But for Multipath-TCP, we must use connectx
77+
// http://blog.multipath-tcp.org/blog/html/2018/12/17/multipath_tcp_apis.html
78+
let stream = if opts.tcp.mptcp {
79+
let stream = unsafe {
80+
let raddr = SockAddr::from(addr);
81+
82+
let mut endpoints: libc::sa_endpoints_t = mem::zeroed();
83+
endpoints.sae_dstaddr = raddr.as_ptr();
84+
endpoints.sae_dstaddrlen = raddr.len();
85+
86+
let ret = libc::connectx(
87+
socket.as_raw_fd(),
88+
&endpoints as *const _,
89+
libc::SAE_ASSOCID_ANY,
90+
0,
91+
ptr::null(),
92+
0,
93+
ptr::null_mut(),
94+
ptr::null_mut(),
95+
);
96+
97+
if ret != 0 {
98+
let err = io::Error::last_os_error();
99+
if err.raw_os_error() != Some(libc::EINPROGRESS) {
100+
return Err(err);
101+
}
102+
}
103+
104+
let fd = socket.into_raw_fd();
105+
TokioTcpStream::from_std(StdTcpStream::from_raw_fd(fd))?
106+
};
107+
108+
stream.ready(Interest::WRITABLE).await?;
109+
110+
if let Err(err) = stream.take_error() {
111+
return Err(err);
112+
}
113+
114+
stream
115+
} else {
116+
socket.connect(addr).await?
117+
};
118+
52119
set_common_sockopt_after_connect(&stream, opts)?;
53120
return Ok(TcpStream::Standard(stream));
54121
}
@@ -155,6 +222,14 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
155222
Ok(())
156223
}
157224

225+
/// Create a TCP socket for listening
226+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
227+
match bind_addr {
228+
SocketAddr::V4(..) => TcpSocket::new_v4(),
229+
SocketAddr::V6(..) => TcpSocket::new_v6(),
230+
}
231+
}
232+
158233
fn set_ip_bound_if<S: AsRawFd>(socket: &S, addr: SocketAddr, iface: &str) -> io::Result<()> {
159234
const IP_BOUND_IF: libc::c_int = 25; // bsd/netinet/in.h
160235
const IPV6_BOUND_IF: libc::c_int = 125; // bsd/netinet6/in6.h

crates/shadowsocks/src/net/sys/unix/linux/mod.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{
22
io,
33
mem,
44
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
5-
os::unix::io::{AsRawFd, RawFd},
5+
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
66
pin::Pin,
77
ptr,
88
sync::atomic::{AtomicBool, Ordering},
@@ -22,6 +22,7 @@ use tokio_tfo::TfoStream;
2222
use crate::net::{
2323
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
2424
udp::{BatchRecvMessage, BatchSendMessage},
25+
AcceptOpts,
2526
AddrFamily,
2627
ConnectOpts,
2728
};
@@ -35,11 +36,24 @@ pub enum TcpStream {
3536

3637
impl TcpStream {
3738
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
39+
if opts.tcp.mptcp {
40+
return TcpStream::connect_mptcp(addr, opts).await;
41+
}
42+
3843
let socket = match addr {
3944
SocketAddr::V4(..) => TcpSocket::new_v4()?,
4045
SocketAddr::V6(..) => TcpSocket::new_v6()?,
4146
};
4247

48+
TcpStream::connect_with_socket(socket, addr, opts).await
49+
}
50+
51+
async fn connect_mptcp(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
52+
let socket = create_mptcp_socket(&addr)?;
53+
TcpStream::connect_with_socket(socket, addr, opts).await
54+
}
55+
56+
async fn connect_with_socket(socket: TcpSocket, addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
4357
// Any traffic to localhost should not be protected
4458
// This is a workaround for VPNService
4559
#[cfg(target_os = "android")]
@@ -201,6 +215,31 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
201215
Ok(())
202216
}
203217

218+
fn create_mptcp_socket(bind_addr: &SocketAddr) -> io::Result<TcpSocket> {
219+
unsafe {
220+
let family = match bind_addr {
221+
SocketAddr::V4(..) => libc::AF_INET,
222+
SocketAddr::V6(..) => libc::AF_INET6,
223+
};
224+
let fd = libc::socket(family, libc::SOCK_STREAM, libc::IPPROTO_MPTCP);
225+
let socket = Socket::from_raw_fd(fd);
226+
socket.set_nonblocking(true)?;
227+
Ok(TcpSocket::from_raw_fd(socket.into_raw_fd()))
228+
}
229+
}
230+
231+
/// Create a TCP socket for listening
232+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
233+
if accept_opts.tcp.mptcp {
234+
create_mptcp_socket(bind_addr)
235+
} else {
236+
match bind_addr {
237+
SocketAddr::V4(..) => TcpSocket::new_v4(),
238+
SocketAddr::V6(..) => TcpSocket::new_v6(),
239+
}
240+
}
241+
}
242+
204243
/// Disable IP fragmentation
205244
#[inline]
206245
pub fn set_disable_ip_fragmentation<S: AsRawFd>(af: AddrFamily, socket: &S) -> io::Result<()> {

0 commit comments

Comments
 (0)