Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/shadowsocks-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ local-tunnel = ["local"]
# Enable socks4 protocol for sslocal
local-socks4 = ["local"]
# Enable Tun interface protocol for sslocal
local-tun = ["local", "etherparse", "tun", "smoltcp"]
local-tun = ["local", "etherparse", "tun", "smoltcp", "tokio-util"]
# Enable Fake DNS
local-fake-dns = ["local", "trust-dns", "rocksdb", "bson"]
# sslocal support online URL (SIP008 Online Configuration Delivery)
Expand Down Expand Up @@ -148,6 +148,7 @@ tokio = { version = "1.38", features = [
"time",
] }
tokio-native-tls = { version = "0.3", optional = true }
tokio-util = { version = "0.7", optional = true }
native-tls = { version = "0.2.8", optional = true, features = ["alpn"] }
webpki-roots = { version = "0.26", optional = true }
tokio-rustls = { version = "0.26", optional = true, default-features = false, features = [
Expand Down
78 changes: 36 additions & 42 deletions crates/shadowsocks-service/src/local/tun/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use std::{
atomic::{AtomicBool, Ordering},
},
task::{Context, Poll, Waker},
thread::{self, JoinHandle, Thread},
time::Duration,
};

use log::{debug, error, trace};
Expand Down Expand Up @@ -63,20 +61,6 @@ struct TcpSocketControl {
send_state: TcpSocketState,
}

struct ManagerNotify {
thread: Thread,
}

impl ManagerNotify {
fn new(thread: Thread) -> ManagerNotify {
ManagerNotify { thread }
}

fn notify(&self) {
self.thread.unpark();
}
}

struct TcpSocketManager {
device: VirtTunDevice,
iface: Interface,
Expand All @@ -94,7 +78,7 @@ struct TcpSocketCreation {

struct TcpConnection {
control: SharedTcpConnectionControl,
manager_notify: Arc<ManagerNotify>,
manager_notify: Arc<tokio::sync::Notify>,
}

impl Drop for TcpConnection {
Expand All @@ -109,15 +93,15 @@ impl Drop for TcpConnection {
control.send_state = TcpSocketState::Close;
}

self.manager_notify.notify();
self.manager_notify.notify_one();
}
}

impl TcpConnection {
fn new(
socket: TcpSocket<'static>,
socket_creation_tx: &mpsc::UnboundedSender<TcpSocketCreation>,
manager_notify: Arc<ManagerNotify>,
manager_notify: Arc<tokio::sync::Notify>,
tcp_opts: &TcpSocketOpts,
) -> impl Future<Output = TcpConnection> + use<> {
let send_buffer_size = tcp_opts.send_buffer_size.unwrap_or(DEFAULT_TCP_SEND_BUFFER_SIZE);
Expand Down Expand Up @@ -174,7 +158,7 @@ impl AsyncRead for TcpConnection {
buf.advance(n);

if n > 0 {
self.manager_notify.notify();
self.manager_notify.notify_one();
}
Ok(()).into()
}
Expand Down Expand Up @@ -204,7 +188,7 @@ impl AsyncWrite for TcpConnection {
let n = control.send_buffer.enqueue_slice(buf);

if n > 0 {
self.manager_notify.notify();
self.manager_notify.notify_one();
}
Ok(n).into()
}
Expand All @@ -231,17 +215,17 @@ impl AsyncWrite for TcpConnection {
}
}

self.manager_notify.notify();
self.manager_notify.notify_one();
Poll::Pending
}
}

pub struct TcpTun {
context: Arc<ServiceContext>,
manager_handle: Option<JoinHandle<()>>,
manager_notify: Arc<ManagerNotify>,
manager_handle: Option<tokio::task::JoinHandle<std::io::Result<()>>>,
manager_notify: Arc<tokio::sync::Notify>,
manager_socket_creation_tx: mpsc::UnboundedSender<TcpSocketCreation>,
manager_running: Arc<AtomicBool>,
manager_running: ::tokio_util::sync::CancellationToken,
balancer: PingBalancer,
iface_rx: mpsc::UnboundedReceiver<Vec<u8>>,
iface_tx: mpsc::UnboundedSender<Vec<u8>>,
Expand All @@ -250,9 +234,13 @@ pub struct TcpTun {

impl Drop for TcpTun {
fn drop(&mut self) {
self.manager_running.store(false, Ordering::Relaxed);
self.manager_notify.notify();
let _ = self.manager_handle.take().unwrap().join();
self.manager_running.cancel();
self.manager_notify.notify_one();
if let Some(handle) = self.manager_handle.take() {
log::debug!("TcpTun::drop, waiting for manager thread to exit");
std::thread::sleep(std::time::Duration::from_millis(100));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be another way to make it work gracefully.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm researching it.

handle.abort();
}
}
}

Expand Down Expand Up @@ -298,14 +286,15 @@ impl TcpTun {
socket_creation_rx: manager_socket_creation_rx,
};

let manager_running = Arc::new(AtomicBool::new(true));
let manager_running = ::tokio_util::sync::CancellationToken::new();
let manager_notify = Arc::new(tokio::sync::Notify::new());

let manager_handle = {
let manager_running = manager_running.clone();
let manager_notify = manager_notify.clone();

thread::Builder::new()
.name("smoltcp-poll".to_owned())
.spawn(move || {
tokio::spawn(async move {
{
let TcpSocketManager {
ref mut device,
ref mut iface,
Expand All @@ -315,8 +304,14 @@ impl TcpTun {
} = manager;

let mut socket_set = SocketSet::new(vec![]);

while manager_running.load(Ordering::Relaxed) {
let mut next_duration = std::time::Duration::ZERO;

loop {
tokio::select! {
_ = tokio::time::timeout(tokio::time::Duration::from(next_duration), manager_notify.notified()) => {}
_ = manager_running.cancelled() => break,
};
next_duration = std::time::Duration::ZERO;
while let Ok(TcpSocketCreation {
control,
socket,
Expand Down Expand Up @@ -470,22 +465,21 @@ impl TcpTun {
}

if !device.recv_available() {
let next_duration = iface
let _next_duration = iface
.poll_delay(before_poll, &socket_set)
.unwrap_or(SmolDuration::from_millis(5));
if next_duration != SmolDuration::ZERO {
thread::park_timeout(Duration::from(next_duration));
if _next_duration != SmolDuration::ZERO {
next_duration = _next_duration.into();
}
}
}

trace!("VirtDevice::poll thread exited");
})
.unwrap()
Ok(())
}
})
};

let manager_notify = Arc::new(ManagerNotify::new(manager_handle.thread().clone()));

TcpTun {
context,
manager_handle: Some(manager_handle),
Expand Down Expand Up @@ -558,7 +552,7 @@ impl TcpTun {

// Wake up and poll the interface.
self.iface_tx_avail.store(true, Ordering::Release);
self.manager_notify.notify();
self.manager_notify.notify_one();
}

pub async fn recv_packet(&mut self) -> Vec<u8> {
Expand Down