Skip to content

Commit c7989a2

Browse files
committed
feat(mptcp): Basic implementation of Multipath-TCP
- Currently support macOS (Client Only) and Linux (> 5.19) - ref #1156
1 parent 115dba7 commit c7989a2

File tree

14 files changed

+188
-61
lines changed

14 files changed

+188
-61
lines changed

crates/shadowsocks-service/src/config.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,7 @@ use serde::{Deserialize, Serialize};
6666
use shadowsocks::relay::socks5::Address;
6767
use shadowsocks::{
6868
config::{
69-
ManagerAddr,
70-
Mode,
71-
ReplayAttackPolicy,
72-
ServerAddr,
73-
ServerConfig,
74-
ServerUser,
75-
ServerUserManager,
76-
ServerWeight,
69+
ManagerAddr, Mode, ReplayAttackPolicy, ServerAddr, ServerConfig, ServerUser, ServerUserManager, ServerWeight,
7770
},
7871
crypto::CipherKind,
7972
plugin::PluginConfig,
@@ -1123,6 +1116,8 @@ pub struct Config {
11231116
///
11241117
/// If this is not set, sockets will be set with a default timeout
11251118
pub keep_alive: Option<Duration>,
1119+
/// Multipath-TCP
1120+
pub mptcp: bool,
11261121

11271122
/// `RLIMIT_NOFILE` option for *nix systems
11281123
#[cfg(all(unix, not(target_os = "android")))]
@@ -1260,6 +1255,7 @@ impl Config {
12601255
no_delay: false,
12611256
fast_open: false,
12621257
keep_alive: None,
1258+
mptcp: false,
12631259

12641260
#[cfg(all(unix, not(target_os = "android")))]
12651261
nofile: None,

crates/shadowsocks-service/src/local/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
147147
connect_opts.tcp.nodelay = config.no_delay;
148148
connect_opts.tcp.fastopen = config.fast_open;
149149
connect_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
150+
connect_opts.tcp.mptcp = config.mptcp;
150151
context.set_connect_opts(connect_opts);
151152

152153
let mut accept_opts = AcceptOpts {
@@ -158,6 +159,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
158159
accept_opts.tcp.nodelay = config.no_delay;
159160
accept_opts.tcp.fastopen = config.fast_open;
160161
accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
162+
accept_opts.tcp.mptcp = config.mptcp;
161163
context.set_accept_opts(accept_opts);
162164

163165
if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, context.connect_opts_ref()).await {

crates/shadowsocks-service/src/manager/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub async fn run(config: Config) -> io::Result<()> {
5151
connect_opts.tcp.nodelay = config.no_delay;
5252
connect_opts.tcp.fastopen = config.fast_open;
5353
connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
54+
connect_opts.tcp.mptcp = config.mptcp;
5455

5556
let mut accept_opts = AcceptOpts {
5657
ipv6_only: config.ipv6_only,
@@ -61,6 +62,7 @@ pub async fn run(config: Config) -> io::Result<()> {
6162
accept_opts.tcp.nodelay = config.no_delay;
6263
accept_opts.tcp.fastopen = config.fast_open;
6364
accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
65+
accept_opts.tcp.mptcp = config.mptcp;
6466

6567
if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts).await {
6668
manager.set_dns_resolver(Arc::new(resolver));

crates/shadowsocks-service/src/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub async fn run(config: Config) -> io::Result<()> {
7878
connect_opts.tcp.nodelay = config.no_delay;
7979
connect_opts.tcp.fastopen = config.fast_open;
8080
connect_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
81+
connect_opts.tcp.mptcp = config.mptcp;
8182

8283
let mut accept_opts = AcceptOpts {
8384
ipv6_only: config.ipv6_only,
@@ -88,6 +89,7 @@ pub async fn run(config: Config) -> io::Result<()> {
8889
accept_opts.tcp.nodelay = config.no_delay;
8990
accept_opts.tcp.fastopen = config.fast_open;
9091
accept_opts.tcp.keepalive = config.keep_alive.or(Some(SERVER_DEFAULT_KEEPALIVE_TIMEOUT));
92+
accept_opts.tcp.mptcp = config.mptcp;
9193

9294
let resolver = build_dns_resolver(config.dns, config.ipv6_first, &connect_opts)
9395
.await

crates/shadowsocks/src/net/option.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ pub struct TcpSocketOpts {
2020
/// `SO_KEEPALIVE` and sets `TCP_KEEPIDLE`, `TCP_KEEPINTVL` and `TCP_KEEPCNT` respectively,
2121
/// enables keep-alive messages on connection-oriented sockets
2222
pub keepalive: Option<Duration>,
23+
24+
/// Enable Multipath-TCP (mptcp)
25+
/// https://en.wikipedia.org/wiki/Multipath_TCP
26+
pub mptcp: bool,
2327
}
2428

2529
/// Options for connecting to remote server

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{
2-
io,
3-
mem,
2+
io, mem,
43
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
54
os::unix::io::{AsRawFd, RawFd},
65
pin::Pin,
@@ -21,8 +20,7 @@ use tokio_tfo::TfoStream;
2120
use crate::net::{
2221
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
2322
udp::{BatchRecvMessage, BatchSendMessage},
24-
AddrFamily,
25-
ConnectOpts,
23+
AcceptOpts, AddrFamily, ConnectOpts,
2624
};
2725

2826
/// A `TcpStream` that supports TFO (TCP Fast Open)
@@ -170,6 +168,14 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
170168
Ok(())
171169
}
172170

171+
/// Create a TCP socket for listening
172+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
173+
match bind_addr {
174+
SocketAddr::V4(..) => TcpSocket::new_v4(),
175+
SocketAddr::V6(..) => TcpSocket::new_v6(),
176+
}
177+
}
178+
173179
/// Disable IP fragmentation
174180
#[inline]
175181
pub fn set_disable_ip_fragmentation<S: AsRawFd>(af: AddrFamily, socket: &S) -> io::Result<()> {

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::{
22
io::{self, ErrorKind},
33
mem,
4-
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
5-
os::unix::io::{AsRawFd, RawFd},
4+
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream as StdTcpStream},
5+
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
66
pin::Pin,
77
ptr,
88
sync::atomic::{AtomicBool, Ordering},
@@ -13,16 +13,15 @@ use log::{debug, error, warn};
1313
use pin_project::pin_project;
1414
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
1515
use tokio::{
16-
io::{AsyncRead, AsyncWrite, ReadBuf},
16+
io::{AsyncRead, AsyncWrite, Interest, ReadBuf},
1717
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
1818
};
1919
use tokio_tfo::TfoStream;
2020

2121
use crate::net::{
2222
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
2323
udp::{BatchRecvMessage, BatchSendMessage},
24-
AddrFamily,
25-
ConnectOpts,
24+
AcceptOpts, AddrFamily, ConnectOpts,
2625
};
2726

2827
/// A `TcpStream` that supports TFO (TCP Fast Open)
@@ -34,11 +33,34 @@ pub enum TcpStream {
3433

3534
impl TcpStream {
3635
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
36+
if opts.tcp.mptcp {
37+
return TcpStream::connect_mptcp(addr, opts).await;
38+
}
39+
3740
let socket = match addr {
3841
SocketAddr::V4(..) => TcpSocket::new_v4()?,
3942
SocketAddr::V6(..) => TcpSocket::new_v6()?,
4043
};
4144

45+
TcpStream::connect_with_socket(socket, addr, opts).await
46+
}
47+
48+
async fn connect_mptcp(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
49+
// https://opensource.apple.com/source/xnu/xnu-4570.41.2/bsd/sys/socket.h.auto.html
50+
const AF_MULTIPATH: libc::c_int = 39;
51+
52+
let socket = unsafe {
53+
let fd = libc::socket(AF_MULTIPATH, libc::SOCK_STREAM, libc::IPPROTO_TCP);
54+
let socket = Socket::from_raw_fd(fd);
55+
socket.set_nonblocking(true)?;
56+
TcpSocket::from_raw_fd(socket.into_raw_fd())
57+
};
58+
59+
TcpStream::connect_with_socket(socket, addr, opts).await
60+
}
61+
62+
#[inline]
63+
async fn connect_with_socket(socket: TcpSocket, addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
4264
// Binds to a specific network interface (device)
4365
if let Some(ref iface) = opts.bind_interface {
4466
set_ip_bound_if(&socket, addr, iface)?;
@@ -48,7 +70,50 @@ impl TcpStream {
4870

4971
if !opts.tcp.fastopen {
5072
// If TFO is not enabled, it just works like a normal TcpStream
51-
let stream = socket.connect(addr).await?;
73+
//
74+
// But for Multipath-TCP, we must use connectx
75+
// http://blog.multipath-tcp.org/blog/html/2018/12/17/multipath_tcp_apis.html
76+
let stream = if opts.tcp.mptcp {
77+
let stream = unsafe {
78+
let raddr = SockAddr::from(addr);
79+
80+
let mut endpoints: libc::sa_endpoints_t = mem::zeroed();
81+
endpoints.sae_dstaddr = raddr.as_ptr();
82+
endpoints.sae_dstaddrlen = raddr.len();
83+
84+
let ret = libc::connectx(
85+
socket.as_raw_fd(),
86+
&endpoints as *const _,
87+
libc::SAE_ASSOCID_ANY,
88+
0,
89+
ptr::null(),
90+
0,
91+
ptr::null_mut(),
92+
ptr::null_mut(),
93+
);
94+
95+
if ret != 0 {
96+
let err = io::Error::last_os_error();
97+
if err.raw_os_error() != Some(libc::EINPROGRESS) {
98+
return Err(err);
99+
}
100+
}
101+
102+
let fd = socket.into_raw_fd();
103+
TokioTcpStream::from_std(StdTcpStream::from_raw_fd(fd))?
104+
};
105+
106+
stream.ready(Interest::WRITABLE).await?;
107+
108+
if let Err(err) = stream.take_error() {
109+
return Err(err);
110+
}
111+
112+
stream
113+
} else {
114+
socket.connect(addr).await?
115+
};
116+
52117
set_common_sockopt_after_connect(&stream, opts)?;
53118
return Ok(TcpStream::Standard(stream));
54119
}
@@ -155,6 +220,14 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
155220
Ok(())
156221
}
157222

223+
/// Create a TCP socket for listening
224+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
225+
match bind_addr {
226+
SocketAddr::V4(..) => TcpSocket::new_v4(),
227+
SocketAddr::V6(..) => TcpSocket::new_v6(),
228+
}
229+
}
230+
158231
fn set_ip_bound_if<S: AsRawFd>(socket: &S, addr: SocketAddr, iface: &str) -> io::Result<()> {
159232
const IP_BOUND_IF: libc::c_int = 25; // bsd/netinet/in.h
160233
const IPV6_BOUND_IF: libc::c_int = 125; // bsd/netinet6/in6.h

crates/shadowsocks/src/net/sys/unix/linux/mod.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use std::{
2-
io,
3-
mem,
2+
io, mem,
43
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
5-
os::unix::io::{AsRawFd, RawFd},
4+
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
65
pin::Pin,
76
ptr,
87
sync::atomic::{AtomicBool, Ordering},
@@ -22,8 +21,7 @@ use tokio_tfo::TfoStream;
2221
use crate::net::{
2322
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
2423
udp::{BatchRecvMessage, BatchSendMessage},
25-
AddrFamily,
26-
ConnectOpts,
24+
AcceptOpts, AddrFamily, ConnectOpts,
2725
};
2826

2927
/// A `TcpStream` that supports TFO (TCP Fast Open)
@@ -35,11 +33,24 @@ pub enum TcpStream {
3533

3634
impl TcpStream {
3735
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
36+
if opts.tcp.mptcp {
37+
return TcpStream::connect_mptcp(addr, opts).await;
38+
}
39+
3840
let socket = match addr {
3941
SocketAddr::V4(..) => TcpSocket::new_v4()?,
4042
SocketAddr::V6(..) => TcpSocket::new_v6()?,
4143
};
4244

45+
TcpStream::connect_with_socket(socket, addr, opts).await
46+
}
47+
48+
async fn connect_mptcp(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
49+
let socket = create_mptcp_socket(&addr)?;
50+
TcpStream::connect_with_socket(socket, addr, opts).await
51+
}
52+
53+
async fn connect_with_socket(socket: TcpSocket, addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
4354
// Any traffic to localhost should not be protected
4455
// This is a workaround for VPNService
4556
#[cfg(target_os = "android")]
@@ -201,6 +212,31 @@ pub fn set_tcp_fastopen<S: AsRawFd>(socket: &S) -> io::Result<()> {
201212
Ok(())
202213
}
203214

215+
fn create_mptcp_socket(bind_addr: &SocketAddr) -> io::Result<TcpSocket> {
216+
unsafe {
217+
let family = match bind_addr {
218+
SocketAddr::V4(..) => libc::AF_INET,
219+
SocketAddr::V6(..) => libc::AF_INET6,
220+
};
221+
let fd = libc::socket(family, libc::SOCK_STREAM, libc::IPPROTO_MPTCP);
222+
let socket = Socket::from_raw_fd(fd);
223+
socket.set_nonblocking(true)?;
224+
Ok(TcpSocket::from_raw_fd(socket.into_raw_fd()))
225+
}
226+
}
227+
228+
/// Create a TCP socket for listening
229+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
230+
if accept_opts.tcp.mptcp {
231+
create_mptcp_socket(bind_addr)
232+
} else {
233+
match bind_addr {
234+
SocketAddr::V4(..) => TcpSocket::new_v4(),
235+
SocketAddr::V6(..) => TcpSocket::new_v6(),
236+
}
237+
}
238+
}
239+
204240
/// Disable IP fragmentation
205241
#[inline]
206242
pub fn set_disable_ip_fragmentation<S: AsRawFd>(af: AddrFamily, socket: &S) -> io::Result<()> {

crates/shadowsocks/src/net/sys/unix/others.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use tokio::{
1414

1515
use crate::net::{
1616
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect},
17-
AddrFamily,
18-
ConnectOpts,
17+
AcceptOpts, AddrFamily, ConnectOpts,
1918
};
2019

2120
/// A wrapper of `TcpStream`
@@ -93,7 +92,16 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->
9392
Ok(socket)
9493
}
9594

95+
/// Enable TCP Fast Open
9696
pub fn set_tcp_fastopen<S: AsRawFd>(_: &S) -> io::Result<()> {
9797
let err = io::Error::new(ErrorKind::Other, "TFO is not supported in this platform");
9898
Err(err)
9999
}
100+
101+
/// Create a TCP socket for listening
102+
pub async fn create_inbound_tcp_socket(bind_addr: &SocketAddr, _accept_opts: &AcceptOpts) -> io::Result<TcpSocket> {
103+
match bind_addr {
104+
SocketAddr::V4(..) => TcpSocket::new_v4(),
105+
SocketAddr::V6(..) => TcpSocket::new_v6(),
106+
}
107+
}

0 commit comments

Comments
 (0)