Skip to content

Commit e861880

Browse files
committed
Implement Serve feature
1 parent c3564f1 commit e861880

File tree

7 files changed

+367
-61
lines changed

7 files changed

+367
-61
lines changed

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,24 @@ notify = { version = "2.5.4", optional = true }
2525
time = { version = "0.1.34", optional = true }
2626
crossbeam = { version = "0.2.8", optional = true }
2727

28+
# Serve feature
29+
iron = { version = "0.3", optional = true }
30+
staticfile = { version = "0.2", optional = true }
31+
websocket = { version = "0.16.1", optional = true}
32+
2833

2934
# Tests
3035
[dev-dependencies]
3136
tempdir = "0.3.4"
3237

3338

3439
[features]
35-
default = ["output", "watch"]
40+
default = ["output", "watch", "serve"]
3641
debug = []
3742
output = []
3843
regenerate-css = []
3944
watch = ["notify", "time", "crossbeam"]
45+
serve = ["iron", "staticfile", "websocket"]
4046

4147
[[bin]]
4248
doc = false

src/bin/livereload.rs

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
extern crate websocket;
2+
extern crate crossbeam;
3+
4+
use std::sync::mpsc::channel;
5+
use std::sync::mpsc;
6+
use std::io;
7+
use std::thread;
8+
use std::sync::{Arc, Mutex};
9+
use std::ops::Deref;
10+
use std::marker::PhantomData;
11+
12+
use self::websocket::header::WebSocketProtocol;
13+
use self::websocket::ws::sender::Sender;
14+
use self::websocket::ws::receiver::Receiver;
15+
use self::websocket::message::Type;
16+
use self::websocket::{Server, Message};
17+
18+
const WS_PROTOCOL: &'static str = "livereload";
19+
const RELOAD_COMMAND: &'static str = "reload";
20+
21+
22+
#[derive(Debug, Clone, PartialEq)]
23+
enum MessageType {
24+
Reload,
25+
Close,
26+
}
27+
28+
29+
#[derive(Clone)]
30+
struct ComparableSender<T> {
31+
sender: mpsc::Sender<T>,
32+
id: usize,
33+
}
34+
35+
impl<T> PartialEq for ComparableSender<T> {
36+
fn eq(&self, other: &Self) -> bool {
37+
self.id == other.id
38+
}
39+
}
40+
41+
impl<T> Deref for ComparableSender<T> {
42+
type Target = mpsc::Sender<T>;
43+
44+
fn deref(&self) -> &mpsc::Sender<T> {
45+
&self.sender
46+
}
47+
}
48+
49+
50+
struct ComparableSenderFactory<T> {
51+
next_id: usize,
52+
sender_type: PhantomData<T>,
53+
}
54+
55+
impl<T> ComparableSenderFactory<T> {
56+
fn generate(&mut self, sender: mpsc::Sender<T>) -> ComparableSender<T> {
57+
let tx = ComparableSender {
58+
sender: sender,
59+
id: self.next_id,
60+
};
61+
self.next_id += 1;
62+
tx
63+
}
64+
65+
fn new() -> ComparableSenderFactory<T> {
66+
ComparableSenderFactory {
67+
next_id: 0,
68+
sender_type: PhantomData,
69+
}
70+
}
71+
}
72+
73+
74+
pub struct LiveReload {
75+
senders: Arc<Mutex<Vec<ComparableSender<MessageType>>>>,
76+
}
77+
78+
impl LiveReload {
79+
pub fn new(address: &str) -> io::Result<LiveReload> {
80+
let server = try!(Server::bind(address));
81+
82+
let senders: Arc<Mutex<Vec<ComparableSender<MessageType>>>> = Arc::new(Mutex::new(vec![]));
83+
let senders_clone = senders.clone();
84+
85+
let mut factory = ComparableSenderFactory::new();
86+
87+
let lr = LiveReload { senders: senders_clone };
88+
89+
// handle connection attempts on a separate thread
90+
thread::spawn(move || {
91+
for connection in server {
92+
let mut senders = senders.clone();
93+
let (tx, rx) = channel();
94+
95+
let tx = factory.generate(tx);
96+
97+
senders.lock().unwrap().push(tx.clone());
98+
99+
// each connection gets a separate thread
100+
thread::spawn(move || {
101+
let request = connection.unwrap().read_request().unwrap();
102+
let headers = request.headers.clone();
103+
104+
let mut valid = false;
105+
if let Some(&WebSocketProtocol(ref protocols)) = headers.get() {
106+
if protocols.contains(&(WS_PROTOCOL.to_owned())) {
107+
valid = true;
108+
}
109+
}
110+
111+
let client;
112+
if valid {
113+
let mut response = request.accept();
114+
response.headers.set(WebSocketProtocol(vec![WS_PROTOCOL.to_owned()]));
115+
client = response.send().unwrap();
116+
} else {
117+
request.fail().send().unwrap();
118+
println!("{:?}", "Rejecting invalid websocket request.");
119+
return;
120+
}
121+
122+
let (mut ws_tx, mut ws_rx) = client.split();
123+
124+
// handle receiving and sending (websocket) in two separate threads
125+
crossbeam::scope(|scope| {
126+
let tx_clone = tx.clone();
127+
scope.spawn(move || {
128+
let tx = tx_clone;
129+
loop {
130+
match rx.recv() {
131+
Ok(msg) => {
132+
match msg {
133+
MessageType::Reload => {
134+
let message: Message = Message::text(RELOAD_COMMAND.to_owned());
135+
let mut senders = senders.clone();
136+
if ws_tx.send_message(&message).is_err() {
137+
// the receiver isn't available anymore
138+
// remove the tx from senders and exit
139+
LiveReload::remove_sender(&mut senders, &tx);
140+
break;
141+
}
142+
},
143+
MessageType::Close => {
144+
LiveReload::remove_sender(&mut senders, &tx);
145+
break;
146+
},
147+
}
148+
},
149+
Err(e) => {
150+
println!("{:?}", e);
151+
break;
152+
},
153+
}
154+
}
155+
});
156+
157+
for message in ws_rx.incoming_messages() {
158+
match message {
159+
Ok(message) => {
160+
let message: Message = message;
161+
match message.opcode {
162+
Type::Close => {
163+
tx.send(MessageType::Close).unwrap();
164+
break;
165+
},
166+
// TODO ?
167+
// Type::Ping => {
168+
// let message = websocket::Message::pong(message.payload);
169+
// ws_tx.send_message(&message).unwrap();
170+
// },
171+
_ => {
172+
println!("{:?}", message.opcode);
173+
unimplemented!()
174+
},
175+
}
176+
},
177+
Err(err) => {
178+
println!("Error: {}", err);
179+
break;
180+
},
181+
}
182+
}
183+
});
184+
});
185+
}
186+
});
187+
188+
Ok(lr)
189+
}
190+
191+
fn remove_sender(senders: &mut Arc<Mutex<Vec<ComparableSender<MessageType>>>>, el: &ComparableSender<MessageType>) {
192+
let mut senders = senders.lock().unwrap();
193+
let mut index = 0;
194+
for i in 0..senders.len() {
195+
if &senders[i] == el {
196+
index = i;
197+
break;
198+
}
199+
}
200+
senders.remove(index);
201+
}
202+
203+
pub fn trigger_reload(&self) {
204+
let senders = self.senders.lock().unwrap();
205+
println!("Reloading {} client(s).", senders.len());
206+
for sender in senders.iter() {
207+
sender.send(MessageType::Reload).unwrap();
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)