diff --git a/.github/workflows/qns.yml b/.github/workflows/qns.yml index 1d83d31547..353d0ae696 100644 --- a/.github/workflows/qns.yml +++ b/.github/workflows/qns.yml @@ -2,7 +2,8 @@ name: QUIC Network Simulator on: schedule: - - cron: '42 3 * * *' # Runs at 03:42 UTC (m and h chosen arbitrarily) every day. + - cron: '42 3 * * 2,5' # Runs at 03:42 UTC (m and h chosen arbitrarily) twice a week. + workflow_dispatch: pull_request: branch: ["main"] paths: diff --git a/neqo-client/Cargo.toml b/neqo-client/Cargo.toml index 43ff45d9e5..e0d38becbd 100644 --- a/neqo-client/Cargo.toml +++ b/neqo-client/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT OR Apache-2.0" [dependencies] neqo-crypto = { path = "./../neqo-crypto" } neqo-transport = { path = "./../neqo-transport" } -neqo-common = { path="./../neqo-common" } +neqo-common = { path="./../neqo-common", features = ["socket"] } neqo-http3 = { path = "./../neqo-http3" } neqo-qpack = { path = "./../neqo-qpack" } structopt = "0.3.7" diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index de7da48a27..864497bf44 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -11,7 +11,10 @@ use qlog::{events::EventImportance, streamer::QlogStreamer}; use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token}; -use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, Datagram, Role}; +use neqo_common::{ + self as common, bind, emit_datagram, event::Provider, hex, qlog::NeqoQlog, recv_datagram, + Datagram, Role, +}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init, AuthenticationStatus, Cipher, ResumptionToken, @@ -345,14 +348,6 @@ impl QuicParameters { } } -fn emit_datagram(socket: &mio::net::UdpSocket, d: Datagram) -> io::Result<()> { - let sent = socket.send_to(&d[..], &d.destination())?; - if sent != d.len() { - eprintln!("Unable to send all {} bytes of datagram", d.len()); - } - Ok(()) -} - fn get_output_file( url: &Url, output_dir: &Option, @@ -413,7 +408,9 @@ fn process_loop( let mut datagrams: Vec = Vec::new(); 'read: loop { - match socket.recv_from(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + match recv_datagram(socket, &mut buf[..], &mut tos, &mut ttl) { Err(ref err) if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => @@ -430,7 +427,7 @@ fn process_loop( break 'read; } if sz > 0 { - let d = Datagram::new(remote, *local_addr, &buf[..sz]); + let d = Datagram::new(remote, *local_addr, tos, ttl, &buf[..sz]); datagrams.push(d); } } @@ -450,7 +447,7 @@ fn process_loop( 'write: loop { match client.process_output(Instant::now()) { Output::Datagram(dgram) => { - if let Err(e) = emit_datagram(socket, dgram) { + if let Err(e) = emit_datagram(socket, &dgram) { eprintln!("UDP write error: {e}"); client.close(Instant::now(), 0, e.to_string()); exiting = true; @@ -1046,12 +1043,12 @@ fn main() -> Res<()> { SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), }; - let socket = match UdpSocket::bind(&local_addr) { + let socket = match bind(&local_addr) { Err(e) => { eprintln!("Unable to bind UDP socket: {e}"); exit(1) } - Ok(s) => s, + Ok(s) => UdpSocket::from_socket(s)?, }; let poll = Poll::new()?; @@ -1140,7 +1137,7 @@ mod old { use super::{qlog_new, KeyUpdateState, Res}; use mio::{Events, Poll}; - use neqo_common::{event::Provider, Datagram}; + use neqo_common::{event::Provider, recv_datagram, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, @@ -1348,7 +1345,9 @@ mod old { )?; 'read: loop { - match socket.recv_from(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + match recv_datagram(socket, &mut buf[..], &mut tos, &mut ttl) { Err(ref err) if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => @@ -1365,7 +1364,7 @@ mod old { break 'read; } if sz > 0 { - let d = Datagram::new(remote, *local_addr, &buf[..sz]); + let d = Datagram::new(remote, *local_addr, tos, ttl, &buf[..sz]); client.process_input(&d, Instant::now()); handler.maybe_key_update(client)?; } @@ -1382,7 +1381,7 @@ mod old { 'write: loop { match client.process_output(Instant::now()) { Output::Datagram(dgram) => { - if let Err(e) = emit_datagram(socket, dgram) { + if let Err(e) = emit_datagram(socket, &dgram) { eprintln!("UDP write error: {e}"); client.close(Instant::now(), 0, e.to_string()); exiting = true; diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index 25d72980ca..6ed29b5606 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -7,16 +7,30 @@ rust-version = "1.70.0" license = "MIT OR Apache-2.0" build = "build.rs" +[build-dependencies] +cfg_aliases = "0.2.0" + [dependencies] -log = {version = "0.4.0", default-features = false} -env_logger = {version = "0.10", default-features = false} +log = { version = "0.4.0", default-features = false } +enum-map = "2.7.3" +env_logger = { version = "0.10", default-features = false } lazy_static = "1.3.0" +mio = { version = "0.6.17", optional = true } qlog = "0.11.0" -time = {version = "0.3", features = ["formatting"]} +time = { version = "0.3", features = ["formatting"] } +nix = { git = "https://github.com/larseggert/nix.git", branch = "feat-tos", features = [ + "socket", + "net", + "uio", +], optional = true } + +[dev-dependencies] +test-fixture = { path = "../test-fixture" } [features] deny-warnings = [] ci = [] +socket = ["dep:nix", "dep:mio"] [target."cfg(windows)".dependencies.winapi] version = "0.3" diff --git a/neqo-common/build.rs b/neqo-common/build.rs index 0af1a1dbbd..0b7ee269b3 100644 --- a/neqo-common/build.rs +++ b/neqo-common/build.rs @@ -1,3 +1,4 @@ +use cfg_aliases::cfg_aliases; use std::env; fn main() { @@ -5,4 +6,8 @@ fn main() { if target.contains("windows") { println!("cargo:rustc-link-lib=winmm"); } + + cfg_aliases! { + posix_socket: { any(target_os = "macos", target_os = "linux", target_os = "android") }, + } } diff --git a/neqo-common/src/datagram.rs b/neqo-common/src/datagram.rs index 0316dd2309..43df3fe0a2 100644 --- a/neqo-common/src/datagram.rs +++ b/neqo-common/src/datagram.rs @@ -7,20 +7,97 @@ use std::net::SocketAddr; use std::ops::Deref; +use enum_map::Enum; + use crate::hex_with_len; +// ECN (Explicit Congestion Notification) codepoints mapped to the +// lower 2 bits of the TOS field. +// https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml +#[derive(Copy, Clone, PartialEq, Eq, Enum)] +#[repr(u8)] +pub enum IpTosEcn { + NotEct = 0b00, // Not-ECT (Not ECN-Capable Transport) [RFC3168] + Ect1 = 0b01, // ECT(1) (ECN-Capable Transport(1))[1] [RFC8311][RFC Errata 5399][RFC9331] + Ect0 = 0b10, // ECT(0) (ECN-Capable Transport(0)) [RFC3168] + Ce = 0b11, // CE (Congestion Experienced) [RFC3168] +} + +impl From for IpTosEcn { + fn from(v: u8) -> Self { + match v & 0b11 { + 0b00 => IpTosEcn::NotEct, + 0b01 => IpTosEcn::Ect1, + 0b10 => IpTosEcn::Ect0, + 0b11 => IpTosEcn::Ce, + _ => unreachable!(), + } + } +} + +impl From for u8 { + fn from(val: IpTosEcn) -> Self { + val as u8 + } +} + +impl std::fmt::Debug for IpTosEcn { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + IpTosEcn::NotEct => f.write_str("Not-ECT"), + IpTosEcn::Ect1 => f.write_str("ECT(1)"), + IpTosEcn::Ect0 => f.write_str("ECT(0)"), + IpTosEcn::Ce => f.write_str("CE"), + } + } +} + +// DiffServ Codepoints, mapped to the upper six bits of the TOS field. +// https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml +#[derive(Copy, Clone, PartialEq, Eq)] +#[repr(u8)] +pub enum IpTosDscp { + Cs0 = 0b0000_0000, // [RFC2474] + Cs1 = 0b0010_0000, // [RFC2474] + Cs2 = 0b0100_0000, // [RFC2474] + Cs3 = 0b0110_0000, // [RFC2474] + Cs4 = 0b1000_0000, // [RFC2474] + Cs5 = 0b1010_0000, // [RFC2474] + Cs6 = 0b1100_0000, // [RFC2474] + Cs7 = 0b1110_0000, // [RFC2474] + Af11 = 0b0010_1000, // [RFC2597] + Af12 = 0b0011_0000, // [RFC2597] + Af13 = 0b0011_1000, // [RFC2597] + Af21 = 0b0100_1000, // [RFC2597] + Af22 = 0b0101_0000, // [RFC2597] + Af23 = 0b0101_1000, // [RFC2597] + Af31 = 0b0110_1000, // [RFC2597] + Af32 = 0b0111_0000, // [RFC2597] + Af33 = 0b0111_1000, // [RFC2597] + Af41 = 0b1000_1000, // [RFC2597] + Af42 = 0b1001_0000, // [RFC2597] + Af43 = 0b1001_1000, // [RFC2597] + Ef = 0b1011_1000, // [RFC3246] + VoiceAdmit = 0b1011_0000, // [RFC5865] + Le = 0b0000_0100, // [RFC8622] +} + #[derive(PartialEq, Eq, Clone)] pub struct Datagram { src: SocketAddr, dst: SocketAddr, + tos: u8, + ttl: u8, d: Vec, } impl Datagram { - pub fn new>>(src: SocketAddr, dst: SocketAddr, d: V) -> Self { + pub fn new>>(src: SocketAddr, dst: SocketAddr, tos: u8, ttl: u8, d: V) -> Self { Self { src, dst, + tos, + ttl, d: d.into(), } } @@ -34,6 +111,16 @@ impl Datagram { pub fn destination(&self) -> SocketAddr { self.dst } + + #[must_use] + pub fn tos(&self) -> u8 { + self.tos + } + + #[must_use] + pub fn ttl(&self) -> u8 { + self.ttl + } } impl Deref for Datagram { diff --git a/neqo-common/src/lib.rs b/neqo-common/src/lib.rs index 3fb0fd27ec..eadbcc4e95 100644 --- a/neqo-common/src/lib.rs +++ b/neqo-common/src/lib.rs @@ -15,14 +15,18 @@ pub mod hrtime; mod incrdecoder; pub mod log; pub mod qlog; +#[cfg(feature = "socket")] +pub mod socket; pub mod timer; pub use self::codec::{Decoder, Encoder}; -pub use self::datagram::Datagram; +pub use self::datagram::{Datagram, IpTosDscp, IpTosEcn}; pub use self::header::Header; pub use self::incrdecoder::{ IncrementalDecoderBuffer, IncrementalDecoderIgnore, IncrementalDecoderUint, }; +#[cfg(feature = "socket")] +pub use self::socket::{bind, emit_datagram, recv_datagram}; use std::fmt::Write; diff --git a/neqo-common/src/qlog.rs b/neqo-common/src/qlog.rs index ac03ecfcb0..5ff74750b0 100644 --- a/neqo-common/src/qlog.rs +++ b/neqo-common/src/qlog.rs @@ -48,6 +48,11 @@ impl NeqoQlog { }) } + #[must_use] + pub fn inner(&self) -> Rc>> { + Rc::clone(&self.inner) + } + /// Create a disabled `NeqoQlog` configuration. #[must_use] pub fn disabled() -> Self { @@ -144,3 +149,39 @@ pub fn new_trace(role: Role) -> qlog::TraceSeq { }), } } + +#[cfg(test)] +mod test { + use qlog::events::Event; + use test_fixture::EXPECTED_LOG_HEADER; + + const EV_DATA: qlog::events::EventData = + qlog::events::EventData::SpinBitUpdated(qlog::events::connectivity::SpinBitUpdated { + state: true, + }); + + const EXPECTED_LOG_EVENT: &str = concat!( + "\u{1e}", + r#"{"time":0.0,"name":"connectivity:spin_bit_updated","data":{"state":true}}"#, + "\n" + ); + + #[test] + fn new_neqo_qlog() { + let (_log, contents) = test_fixture::new_neqo_qlog(); + assert_eq!(contents.to_string(), EXPECTED_LOG_HEADER); + } + + #[test] + fn add_event() { + let (mut log, contents) = test_fixture::new_neqo_qlog(); + log.add_event(|| Some(Event::with_time(1.1, EV_DATA))); + assert_eq!( + contents.to_string(), + format!( + "{EXPECTED_LOG_HEADER}{e}", + e = EXPECTED_LOG_EVENT.replace("\"time\":0.0,", "\"time\":1.1,") + ) + ); + } +} diff --git a/neqo-common/src/socket.rs b/neqo-common/src/socket.rs new file mode 100644 index 0000000000..6280da874a --- /dev/null +++ b/neqo-common/src/socket.rs @@ -0,0 +1,429 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + io::{self}, + net::SocketAddr, +}; + +#[cfg(posix_socket)] +use std::{ + io::{Error, ErrorKind, IoSlice, IoSliceMut}, + os::fd::AsRawFd, +}; + +#[cfg(posix_socket)] +use nix::{ + cmsg_space, + sys::socket::{ + recvmsg, sendmsg, setsockopt, + sockopt::{ + IpDontFrag, IpRecvTos, IpRecvTtl, Ipv6DontFrag, Ipv6RecvHopLimit, Ipv6RecvTClass, + }, + AddressFamily, + ControlMessage::{IpTos, IpTtl, Ipv6HopLimit, Ipv6TClass}, + ControlMessageOwned, MsgFlags, SockaddrLike, SockaddrStorage, + }, +}; + +use crate::Datagram; + +#[cfg(posix_socket)] +fn configure_sockopts(s: &std::net::UdpSocket, local_addr: &SocketAddr) { + // Don't let the host stack or network path fragment our IP packets + // (RFC9000, Section 14). + let res = match local_addr { + SocketAddr::V4(..) => setsockopt(&s, IpDontFrag, &true), + SocketAddr::V6(..) => setsockopt(&s, Ipv6DontFrag, &true), + }; + debug_assert!(res.is_ok()); + // Request IPv4 type-of-service (TOS) and IPv6 traffic class + // information for all incoming packets. + let res = match local_addr { + SocketAddr::V4(..) => setsockopt(&s, IpRecvTos, &true), + SocketAddr::V6(..) => setsockopt(&s, Ipv6RecvTClass, &true), + }; + debug_assert!(res.is_ok()); + // Request IPv4 time-to-live (TTL) and IPv6 hop count + // information for all incoming packets. + let res = match local_addr { + SocketAddr::V4(..) => setsockopt(&s, IpRecvTtl, &true), + SocketAddr::V6(..) => setsockopt(&s, Ipv6RecvHopLimit, &true), + }; + debug_assert!(res.is_ok()); +} + +/// Binds a `std::net::UdpSocket` socket to the specified local address and sets it to +/// non-blocking mode. +/// +/// # Arguments +/// +/// * `local_addr` - The local `SocketAddr` to bind the socket to. +/// +/// # Returns +/// +/// The bound UDP socket. +/// +/// # Errors +/// +/// Returns an `io::Error` if the UDP socket fails to bind to the specified local address +/// or if the socket fails to be set to non-blocking mode. +/// +/// # Notes +/// +/// This function binds the UDP socket to the specified local address. It also tries to +/// perform additional configuration on the socket, such as setting socket options to +/// request TOS and TTL information for incoming packets. If that additional configuration +/// fails, the function will still return. +/// +pub fn bind(local_addr: &SocketAddr) -> io::Result { + match std::net::UdpSocket::bind(local_addr) { + Err(e) => { + eprintln!("Unable to bind UDP socket: {e}"); + Err(e) + } + Ok(s) => { + if let Err(e) = s.set_nonblocking(true) { + eprintln!("Unable to set UDP socket to non-blocking mode: {e}"); + return Err(e); + } + #[cfg(posix_socket)] + configure_sockopts(&s, local_addr); + Ok(s) + } + } +} + +pub trait UdpIo { + /// Send the UDP datagram on the specified socket. + /// + /// # Arguments + /// + /// * `d` - The datagram to send. + /// + /// # Returns + /// + /// An `io::Result` indicating whether the datagram was sent successfully. + /// + /// # Errors + /// + /// Returns an `io::Error` if the UDP socket fails to send the datagram. + /// + fn send(&self, d: &Datagram) -> io::Result; + + /// Receive a UDP datagram on the specified socket. + /// + /// # Arguments + /// + /// * `buf` - The buffer to receive the datagram into. + /// * `tos` - The type-of-service (TOS) or traffic class (TC) value of the received datagram. + /// * `ttl` - The time-to-live (TTL) or hop limit (HL) value of the received datagram. + /// + /// # Returns + /// + /// An `io::Result` indicating the size of the received datagram and the source address. + /// + /// # Errors + /// + /// Returns an `io::Error` if the UDP socket fails to receive the datagram. + /// + fn recv(&self, buf: &mut [u8], tos: &mut u8, ttl: &mut u8) -> io::Result<(usize, SocketAddr)>; +} + +fn emit_result(result: io::Result, len: usize) -> usize { + let sent = result.unwrap(); + if sent != len { + eprintln!("Only able to send {sent}/{len} bytes of datagram"); + } + sent +} + +#[cfg(posix_socket)] +fn emit_datagram_posix(socket: &S, d: &Datagram) -> io::Result { + let iov = [IoSlice::new(&d[..])]; + let tos = i32::from(d.tos()); + let ttl = i32::from(d.ttl()); + let cmsgs = match d.destination() { + SocketAddr::V4(..) => [IpTos(&tos), IpTtl(&ttl)], + SocketAddr::V6(..) => [Ipv6TClass(&tos), Ipv6HopLimit(&ttl)], + }; + match sendmsg( + socket.as_raw_fd(), + &iov, + &cmsgs, + MsgFlags::empty(), + Some(&SockaddrStorage::from(d.destination())), + ) { + Ok(res) => Ok(res), + Err(e) => Err(Error::from_raw_os_error(e as i32)), + } +} + +#[cfg(posix_socket)] +fn to_socket_addr(addr: &SockaddrStorage) -> SocketAddr { + match addr.family().unwrap() { + AddressFamily::Inet => { + let addr = addr.as_sockaddr_in().unwrap(); + SocketAddr::new(std::net::IpAddr::V4(addr.ip()), addr.port()) + } + AddressFamily::Inet6 => { + let addr = addr.as_sockaddr_in6().unwrap(); + SocketAddr::new(std::net::IpAddr::V6(addr.ip()), addr.port()) + } + _ => unreachable!(), + } +} + +/// Use `recvmsg` to receive a UDP datagram and its metadata on the specified socket. +/// +/// # Arguments +/// +/// * `socket` - The UDP socket to receive the datagram on. +/// * `buf` - The buffer to receive the datagram into. +/// * `tos` - The type-of-service (TOS) or traffic class (TC) value of the received datagram. +/// * `ttl` - The time-to-live (TTL) or hop limit (HL) value of the received datagram. +/// +/// # Returns +/// +/// An `io::Result` indicating the size of the received datagram and the source address. +/// +/// # Errors +/// +/// Returns an `io::Error` if the UDP socket fails to receive the datagram. +/// +#[cfg(posix_socket)] +pub fn recv_datagram_posix( + socket: &S, + buf: &mut [u8], + tos: &mut u8, + ttl: &mut u8, +) -> io::Result<(usize, SocketAddr)> { + let mut iov = [IoSliceMut::new(buf)]; + let mut cmsg = cmsg_space!(u8, u8); + let flags = MsgFlags::empty(); + + match recvmsg::(socket.as_raw_fd(), &mut iov, Some(&mut cmsg), flags) { + Err(e) => Err(Error::from_raw_os_error(e as i32)), + Ok(res) => { + for cmsg in res.cmsgs() { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + // All valid values fit in u8. + match cmsg { + ControlMessageOwned::IpTos(t) | ControlMessageOwned::Ipv6TClass(t) => { + *tos = t as u8; + } + ControlMessageOwned::IpTtl(t) | ControlMessageOwned::Ipv6HopLimit(t) => { + *ttl = t as u8; + } + _ => unreachable!(), + }; + } + let Some(addr) = res.address else { + return Err(Error::new( + ErrorKind::Other, + "Unable to retrieve source address from datagram", + )); + }; + Ok((res.bytes, to_socket_addr(&addr))) + } + } +} + +impl UdpIo for std::net::UdpSocket { + #[cfg(posix_socket)] + fn send(&self, d: &Datagram) -> io::Result { + let res = emit_result(emit_datagram_posix(self, d), d.len()); + Ok(res) + } + + #[cfg(not(posix_socket))] + fn send(&self, d: &Datagram) -> io::Result { + let res = emit_result(self.send_to(&d[..], d.destination()), d.len()); + Ok(res) + } + + #[cfg(posix_socket)] + fn recv(&self, buf: &mut [u8], tos: &mut u8, ttl: &mut u8) -> io::Result<(usize, SocketAddr)> { + recv_datagram_posix(self, buf, tos, ttl) + } + + #[cfg(not(posix_socket))] + fn recv(&self, buf: &mut [u8], tos: &mut u8, ttl: &mut u8) -> io::Result<(usize, SocketAddr)> { + *tos = IpTosEcn::NotEct.into(); + *ttl = 0xff; + self.recv_from(&mut buf[..]) + } +} + +impl UdpIo for mio::net::UdpSocket { + #[cfg(posix_socket)] + fn send(&self, d: &Datagram) -> io::Result { + let res = emit_result(emit_datagram_posix(self, d), d.len()); + Ok(res) + } + #[cfg(not(posix_socket))] + fn send(&self, d: &Datagram) -> io::Result { + let res = emit_result(self.send_to(&d[..], &d.destination()), d.len()); + Ok(res) + } + + #[cfg(posix_socket)] + fn recv(&self, buf: &mut [u8], tos: &mut u8, ttl: &mut u8) -> io::Result<(usize, SocketAddr)> { + recv_datagram_posix(self, buf, tos, ttl) + } + + #[cfg(not(posix_socket))] + fn recv(&self, buf: &mut [u8], tos: &mut u8, ttl: &mut u8) -> io::Result<(usize, SocketAddr)> { + *tos = IpTosEcn::NotEct.into(); + *ttl = 0xff; + self.recv_from(&mut buf[..]) + } +} + +/// Send the UDP datagram on the specified socket. +/// +/// # Arguments +/// +/// * `socket` - The UDP socket to send the datagram on. +/// * `d` - The datagram to send. +/// +/// # Returns +/// +/// An `io::Result` indicating whether the datagram was sent successfully. +/// +/// # Errors +/// +/// Returns an `io::Error` if the UDP socket fails to send the datagram. +/// +pub fn emit_datagram(socket: &S, d: &Datagram) -> io::Result { + socket.send(d) +} + +/// Receive a UDP datagram on the specified socket. +/// +/// # Arguments +/// +/// * `socket` - The UDP socket to receive the datagram on. +/// * `buf` - The buffer to receive the datagram into. +/// * `tos` - The type-of-service (TOS) or traffic class (TC) value of the received datagram. +/// * `ttl` - The time-to-live (TTL) or hop limit (HL) value of the received datagram. +/// +/// # Returns +/// +/// An `io::Result` indicating the size of the received datagram and the source address. +/// +/// # Errors +/// +/// Returns an `io::Error` if the UDP socket fails to receive the datagram. +/// +pub fn recv_datagram( + socket: &S, + buf: &mut [u8], + tos: &mut u8, + ttl: &mut u8, +) -> io::Result<(usize, SocketAddr)> { + socket.recv(buf, tos, ttl) +} + +#[cfg(test)] +mod test { + use crate::{bind, emit_datagram, recv_datagram, Datagram, IpTosEcn}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + + const ADDR_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); + const ADDR_V6: Ipv6Addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); + const ADDR_V4_INVALID: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 1); + const ADDR_V6_INVALID: Ipv6Addr = Ipv6Addr::new(1, 0, 0, 0, 0, 0, 0, 0); + const SOCK_V4: &SocketAddr = &SocketAddr::new(IpAddr::V4(ADDR_V4), 0); + const SOCK_V6: &SocketAddr = &SocketAddr::new(IpAddr::V6(ADDR_V6), 0); + + fn bind_ok(sock_addr: &SocketAddr) { + assert!(bind(sock_addr).is_ok()); + } + + fn bind_err(sock_addr: &SocketAddr) { + assert!(bind(sock_addr).is_err()); + } + + fn io(sock_addr: &SocketAddr) { + // We reconfigure the sockets to blocking mode for this test, so we don't have to poll. + let server = match bind(sock_addr) { + Ok(s) => { + s.set_nonblocking(false) + .expect("Unable to set server socket to blocking mode"); + s + } + Err(e) => panic!("{}", e), + }; + + let client = match bind(sock_addr) { + Ok(s) => { + s.set_nonblocking(false) + .expect("Unable to set client socket to blocking mode"); + s + } + Err(e) => panic!("{}", e), + }; + + let d = Datagram::new( + client.local_addr().unwrap(), + server.local_addr().unwrap(), + IpTosEcn::Ce.into(), + 16, + [0x42; 16], + ); + + let res = emit_datagram(&client, &d); + assert!(res.is_ok()); + + let mut buf = [0; 16]; + let mut tos = 0; + let mut ttl = 0; + let res = recv_datagram(&server, &mut buf, &mut tos, &mut ttl); + assert!(res.is_ok()); + #[cfg(posix_socket)] + // On non-POSIX platforms, the TOS byte will usually always be 0 on RX, so don't check it there. + assert_eq!(tos, IpTosEcn::Ce as u8); + // MacOS has a kernel bug that prevents setting the TTL via CMSG, so just check that it is not 0. + assert_ne!(ttl, 0); + assert_eq!(buf, [0x42; 16]); + + drop(client); + drop(server); + } + + #[test] + fn bind_v4() { + bind_ok(SOCK_V4); + } + + #[test] + fn bind_v6() { + bind_ok(SOCK_V6); + } + + #[test] + fn bind_err_v4() { + const INVAL_V4: &SocketAddr = &SocketAddr::new(IpAddr::V4(ADDR_V4_INVALID), 0); + bind_err(INVAL_V4); + } + + #[test] + fn bind_err_v6() { + const INVAL_V6: &SocketAddr = &SocketAddr::new(IpAddr::V6(ADDR_V6_INVALID), 0); + bind_err(INVAL_V6); + } + + #[test] + fn io_v4() { + io(SOCK_V4); + } + + #[test] + fn io_v6() { + io(SOCK_V6); + } +} diff --git a/neqo-http3/src/lib.rs b/neqo-http3/src/lib.rs index 76be301a8e..c0c093a897 100644 --- a/neqo-http3/src/lib.rs +++ b/neqo-http3/src/lib.rs @@ -95,7 +95,7 @@ match client.process_output(Instant::now()) { // Reading new data coming for the network. match socket.recv_from(&mut buf[..]) { Ok((sz, remote)) => { - let d = Datagram::new(remote, *local_addr, &buf[..sz]); + let d = Datagram::new(remote, *local_addr, tos, ttl, &buf[..sz]); client.process_input(d, Instant::now()); } Err(err) => { diff --git a/neqo-interop/Cargo.toml b/neqo-interop/Cargo.toml index 7660b0f1d0..fac44f0cad 100644 --- a/neqo-interop/Cargo.toml +++ b/neqo-interop/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" [dependencies] neqo-crypto = { path = "./../neqo-crypto" } neqo-transport = { path = "./../neqo-transport" } -neqo-common = { path="./../neqo-common" } +neqo-common = { path="./../neqo-common", features = ["socket"] } neqo-http3 = { path = "./../neqo-http3" } neqo-qpack = { path = "./../neqo-qpack" } diff --git a/neqo-interop/src/main.rs b/neqo-interop/src/main.rs index ef5646ea73..755ff69c6e 100644 --- a/neqo-interop/src/main.rs +++ b/neqo-interop/src/main.rs @@ -7,7 +7,7 @@ #![cfg_attr(feature = "deny-warnings", deny(warnings))] #![warn(clippy::use_self)] -use neqo_common::{event::Provider, hex, Datagram}; +use neqo_common::{bind, emit_datagram, event::Provider, hex, recv_datagram, Datagram}; use neqo_crypto::{init, AuthenticationStatus, ResumptionToken}; use neqo_http3::{Header, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ @@ -60,13 +60,6 @@ trait Handler { } } -fn emit_datagram(socket: &UdpSocket, d: Datagram) { - let sent = socket.send(&d[..]).expect("Error sending datagram"); - if sent != d.len() { - eprintln!("Unable to send all {} bytes of datagram", d.len()); - } -} - lazy_static::lazy_static! { static ref TEST_TIMEOUT: Mutex = Mutex::new(Duration::from_secs(5)); } @@ -116,7 +109,10 @@ fn process_loop( match output { Output::Datagram(dgram) => { let dgram = handler.rewrite_out(&dgram).unwrap_or(dgram); - emit_datagram(&nctx.socket, dgram); + if let Err(e) = emit_datagram(&nctx.socket, &dgram) { + eprintln!("UDP write error: {e}"); + continue; + } } Output::Callback(duration) => { let delay = min(timer.check()?, duration); @@ -133,7 +129,9 @@ fn process_loop( return Ok(client.state().clone()); } - let sz = match nctx.socket.recv(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + let (sz, _) = match recv_datagram(&nctx.socket, &mut buf[..], &mut tos, &mut ttl) { Ok(sz) => sz, Err(e) => { return Err(String::from(match e.kind() { @@ -148,7 +146,7 @@ fn process_loop( continue; } if sz > 0 { - let received = Datagram::new(nctx.remote_addr, nctx.local_addr, &buf[..sz]); + let received = Datagram::new(nctx.remote_addr, nctx.local_addr, tos, ttl, &buf[..sz]); client.process_input(&received, Instant::now()); } } @@ -279,7 +277,12 @@ fn process_loop_h3( loop { let output = handler.h3.conn().process_output(Instant::now()); match output { - Output::Datagram(dgram) => emit_datagram(&nctx.socket, dgram), + Output::Datagram(dgram) => { + if let Err(e) = emit_datagram(&nctx.socket, &dgram) { + eprintln!("UDP write error: {e}"); + break; + } + } Output::Callback(duration) => { let delay = min(timer.check()?, duration); nctx.socket.set_read_timeout(Some(delay)).unwrap(); @@ -294,7 +297,9 @@ fn process_loop_h3( return Ok(handler.h3.conn().state().clone()); } - let sz = match nctx.socket.recv(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + let (sz, _) = match recv_datagram(&nctx.socket, &mut buf[..], &mut tos, &mut ttl) { Ok(sz) => sz, Err(e) => { return Err(String::from(match e.kind() { @@ -309,7 +314,7 @@ fn process_loop_h3( continue; } if sz > 0 { - let received = Datagram::new(nctx.remote_addr, nctx.local_addr, &buf[..sz]); + let received = Datagram::new(nctx.remote_addr, nctx.local_addr, tos, ttl, &buf[..sz]); handler.h3.process_input(&received, Instant::now()); } } @@ -682,7 +687,13 @@ impl Handler for VnHandler { fn rewrite_out(&mut self, d: &Datagram) -> Option { let mut payload = d[..].to_vec(); payload[1] = 0x1a; - Some(Datagram::new(d.source(), d.destination(), payload)) + Some(Datagram::new( + d.source(), + d.destination(), + d.tos(), + d.ttl(), + payload, + )) } } @@ -704,7 +715,7 @@ fn test_vn(nctx: &NetworkCtx, peer: &Peer) -> Connection { } fn run_test<'t>(peer: &Peer, test: &'t Test) -> (&'t Test, String) { - let socket = UdpSocket::bind(peer.bind()).expect("Unable to bind UDP socket"); + let socket = bind(&peer.bind()).expect("Unable to bind UDP socket"); socket.connect(peer).expect("Unable to connect UDP socket"); let local_addr = socket.local_addr().expect("Socket local address not bound"); diff --git a/neqo-server/Cargo.toml b/neqo-server/Cargo.toml index 09ac930d50..5ee17c2262 100644 --- a/neqo-server/Cargo.toml +++ b/neqo-server/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" [dependencies] neqo-crypto = { path = "./../neqo-crypto" } neqo-transport = { path = "./../neqo-transport" } -neqo-common = { path="./../neqo-common" } +neqo-common = { path="./../neqo-common", features = ["socket"] } neqo-http3 = { path = "./../neqo-http3" } neqo-qpack = { path = "./../neqo-qpack" } structopt = "0.3.7" diff --git a/neqo-server/src/main.rs b/neqo-server/src/main.rs index ac4c952837..492be37d08 100644 --- a/neqo-server/src/main.rs +++ b/neqo-server/src/main.rs @@ -30,7 +30,9 @@ use mio_extras::timer::{Builder, Timeout, Timer}; use neqo_transport::ConnectionIdGenerator; use structopt::StructOpt; -use neqo_common::{hex, qdebug, qinfo, qwarn, Datagram, Header}; +use neqo_common::{ + bind, emit_datagram, hex, qdebug, qinfo, qwarn, recv_datagram, Datagram, Header, +}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, generate_ech_keys, init_db, random, AntiReplay, Cipher, @@ -317,15 +319,6 @@ impl QuicParameters { } } -fn emit_packet(socket: &mut UdpSocket, out_dgram: Datagram) { - let sent = socket - .send_to(&out_dgram, &out_dgram.destination()) - .expect("Error sending datagram"); - if sent != out_dgram.len() { - eprintln!("Unable to send all {} bytes of datagram", out_dgram.len()); - } -} - fn qns_read_response(filename: &str) -> Option> { let mut file_path = PathBuf::from("/www"); file_path.push(filename.trim_matches(|p| p == '/')); @@ -585,11 +578,13 @@ impl HttpServer for SimpleServer { } fn read_dgram( - socket: &mut UdpSocket, + socket: &UdpSocket, local_address: &SocketAddr, ) -> Result, io::Error> { let buf = &mut [0u8; 2048]; - let (sz, remote_addr) = match socket.recv_from(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + let (sz, remote_addr) = match recv_datagram(socket, &mut buf[..], &mut tos, &mut ttl) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(None), Err(err) => { eprintln!("UDP recv error: {err:?}"); @@ -606,7 +601,13 @@ fn read_dgram( eprintln!("zero length datagram received?"); Ok(None) } else { - Ok(Some(Datagram::new(remote_addr, *local_address, &buf[..sz]))) + Ok(Some(Datagram::new( + remote_addr, + *local_address, + tos, + ttl, + &buf[..sz], + ))) } } @@ -650,12 +651,12 @@ impl ServersRunner { } for (i, host) in self.hosts.iter().enumerate() { - let socket = match UdpSocket::bind(host) { + let socket = match bind(host) { Err(err) => { eprintln!("Unable to bind UDP socket: {err}"); return Err(err); } - Ok(s) => s, + Ok(s) => UdpSocket::from_socket(s)?, }; let local_addr = match socket.local_addr() { @@ -723,7 +724,7 @@ impl ServersRunner { } /// Tries to find a socket, but then just falls back to sending from the first. - fn find_socket(&mut self, addr: SocketAddr) -> &mut UdpSocket { + fn find_socket(&mut self, addr: SocketAddr) -> &UdpSocket { let (first, rest) = self.sockets.split_first_mut().unwrap(); rest.iter_mut() .find(|s| { @@ -738,7 +739,9 @@ impl ServersRunner { match self.server.process(dgram, self.args.now()) { Output::Datagram(dgram) => { let socket = self.find_socket(dgram.source()); - emit_packet(socket, dgram); + if let Err(e) = emit_datagram(socket, &dgram) { + eprintln!("UDP write error: {}", e); + } true } Output::Callback(new_timeout) => { diff --git a/neqo-transport/Cargo.toml b/neqo-transport/Cargo.toml index a4da735a8a..ab97ff20b4 100644 --- a/neqo-transport/Cargo.toml +++ b/neqo-transport/Cargo.toml @@ -14,6 +14,7 @@ log = {version = "0.4.0", default-features = false} smallvec = "1.0.0" qlog = "0.11.0" indexmap = "1.0" +enum-map = "2.7.3" [dev-dependencies] test-fixture = { path = "../test-fixture" } diff --git a/neqo-transport/src/cid.rs b/neqo-transport/src/cid.rs index 38157419de..eefc3104a9 100644 --- a/neqo-transport/src/cid.rs +++ b/neqo-transport/src/cid.rs @@ -6,24 +6,23 @@ // Representation and management of connection IDs. -use crate::frame::FRAME_TYPE_NEW_CONNECTION_ID; -use crate::packet::PacketBuilder; -use crate::recovery::RecoveryToken; -use crate::stats::FrameStats; -use crate::{Error, Res}; +use crate::{ + frame::FRAME_TYPE_NEW_CONNECTION_ID, packet::PacketBuilder, recovery::RecoveryToken, + stats::FrameStats, Error, Res, +}; use neqo_common::{hex, hex_with_len, qinfo, Decoder, Encoder}; use neqo_crypto::random; use smallvec::SmallVec; -use std::borrow::Borrow; -use std::cell::{Ref, RefCell}; -use std::cmp::max; -use std::cmp::min; -use std::convert::AsRef; -use std::convert::TryFrom; -use std::ops::Deref; -use std::rc::Rc; +use std::{ + borrow::Borrow, + cell::{Ref, RefCell}, + cmp::{max, min}, + convert::{AsRef, TryFrom}, + ops::Deref, + rc::Rc, +}; pub const MAX_CONNECTION_ID_LEN: usize = 20; pub const LOCAL_ACTIVE_CID_LIMIT: usize = 8; @@ -88,8 +87,8 @@ impl + ?Sized> From<&T> for ConnectionId { } } -impl<'a> From<&ConnectionIdRef<'a>> for ConnectionId { - fn from(cidref: &ConnectionIdRef<'a>) -> Self { +impl<'a> From> for ConnectionId { + fn from(cidref: ConnectionIdRef<'a>) -> Self { Self::from(SmallVec::from(cidref.cid)) } } @@ -120,7 +119,7 @@ impl<'a> PartialEq> for ConnectionId { } } -#[derive(Hash, Eq, PartialEq)] +#[derive(Hash, Eq, PartialEq, Clone, Copy)] pub struct ConnectionIdRef<'a> { cid: &'a [u8], } @@ -324,6 +323,10 @@ impl ConnectionIdEntry { pub fn connection_id(&self) -> &ConnectionId { &self.cid } + + pub fn reset_token(&self) -> &SRT { + &self.srt + } } pub type RemoteConnectionIdEntry = ConnectionIdEntry<[u8; 16]>; @@ -340,8 +343,8 @@ impl ConnectionIdStore { self.cids.retain(|c| c.seqno != seqno); } - pub fn contains(&self, cid: &ConnectionIdRef) -> bool { - self.cids.iter().any(|c| &c.cid == cid) + pub fn contains(&self, cid: ConnectionIdRef) -> bool { + self.cids.iter().any(|c| c.cid == cid) } pub fn next(&mut self) -> Option> { @@ -479,7 +482,7 @@ impl ConnectionIdManager { } } - pub fn is_valid(&self, cid: &ConnectionIdRef) -> bool { + pub fn is_valid(&self, cid: ConnectionIdRef) -> bool { self.connection_ids.contains(cid) } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 3d7bc0a88c..930b3ac46a 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -1149,7 +1149,13 @@ impl Connection { /// part that we don't have keys for. fn save_datagram(&mut self, cspace: CryptoSpace, d: &Datagram, remaining: usize, now: Instant) { let d = if remaining < d.len() { - Datagram::new(d.source(), d.destination(), &d[d.len() - remaining..]) + Datagram::new( + d.source(), + d.destination(), + d.tos(), + d.ttl(), + &d[d.len() - remaining..], + ) } else { d.clone() }; @@ -1210,7 +1216,7 @@ impl Connection { dcid: Option<&ConnectionId>, now: Instant, ) -> Res { - if dcid.map_or(false, |d| d != packet.dcid()) { + if dcid.map_or(false, |d| d != &packet.dcid()) { self.stats .borrow_mut() .pkt_dropped("Coalesced packet has different DCID"); @@ -1266,7 +1272,7 @@ impl Connection { if versions.is_empty() || versions.contains(&self.version().wire_version()) || versions.contains(&0) - || packet.scid() != self.odcid().unwrap() + || &packet.scid() != self.odcid().unwrap() || matches!( self.address_validation, AddressValidationInfo::Retry { .. } @@ -1373,7 +1379,7 @@ impl Connection { self.handle_migration(path, d, migrate, now); } else if self.role != Role::Client && (packet.packet_type() == PacketType::Handshake - || (packet.dcid().len() >= 8 && packet.dcid() == &self.local_initial_source_cid)) + || (packet.dcid().len() >= 8 && packet.dcid() == self.local_initial_source_cid)) { // We only allow one path during setup, so apply handshake // path validation to this path. @@ -1445,7 +1451,16 @@ impl Connection { self.stats.borrow_mut().dups_rx += 1; } else { match self.process_packet(path, &payload, now) { - Ok(migrate) => self.postprocess_packet(path, d, &packet, migrate, now), + Ok(migrate) => { + // Since we processed frames from this IP packet now, + // update the ECN counts (RFC9000, Section 13.4.1). + if let Some(space) = self.acks.get_mut(space) { + space.inc_ecn_count(d.tos().into()) + } else { + qdebug!("Not tracking ECN for dropped packet number space"); + } + self.postprocess_packet(path, d, &packet, migrate, now) + } Err(e) => { self.ensure_error_path(path, &packet, now); return Err(e); @@ -2643,10 +2658,20 @@ impl Connection { ack_delay, first_ack_range, ack_ranges, + ect0_count, + ect1_count, + ce_count, } => { let ranges = Frame::decode_ack_frame(largest_acknowledged, first_ack_range, &ack_ranges)?; self.handle_ack(space, largest_acknowledged, ranges, ack_delay, now); + // TODO: Handle incoming ECN info. + qdebug!( + "input_frame ect0 {} ect1 {} ce {}", + ect0_count, + ect1_count, + ce_count + ); } Frame::Crypto { offset, data } => { qtrace!( diff --git a/neqo-transport/src/connection/tests/close.rs b/neqo-transport/src/connection/tests/close.rs index 6efbb6e24f..39b1106ce0 100644 --- a/neqo-transport/src/connection/tests/close.rs +++ b/neqo-transport/src/connection/tests/close.rs @@ -9,9 +9,8 @@ use super::{connect, connect_force_idle, default_client, default_server, send_so use crate::tparams::{self, TransportParameter}; use crate::{AppError, ConnectionError, Error, ERROR_APPLICATION_CLOSE}; -use neqo_common::Datagram; use std::time::Duration; -use test_fixture::{self, addr, now}; +use test_fixture::{self, datagram, now}; fn assert_draining(c: &Connection, expected: &Error) { assert!(c.state().closed()); @@ -201,6 +200,6 @@ fn stateless_reset_client() { .unwrap(); connect_force_idle(&mut client, &mut server); - client.process_input(&Datagram::new(addr(), addr(), vec![77; 21]), now()); + client.process_input(&datagram(vec![77; 21]), now()); assert_draining(&client, &Error::StatelessReset); } diff --git a/neqo-transport/src/connection/tests/handshake.rs b/neqo-transport/src/connection/tests/handshake.rs index 602611d34f..a0f48b9770 100644 --- a/neqo-transport/src/connection/tests/handshake.rs +++ b/neqo-transport/src/connection/tests/handshake.rs @@ -20,7 +20,7 @@ use crate::{ ConnectionError, ConnectionParameters, EmptyConnectionIdGenerator, Error, StreamType, Version, }; -use neqo_common::{event::Provider, qdebug, Datagram}; +use neqo_common::{event::Provider, qdebug, Datagram, IpTosEcn}; use neqo_crypto::{ constants::TLS_CHACHA20_POLY1305_SHA256, generate_ech_keys, AuthenticationStatus, }; @@ -615,7 +615,7 @@ fn corrupted_initial() { .find(|(_, &v)| v != 0) .unwrap(); corrupted[idx] ^= 0x76; - let dgram = Datagram::new(d.source(), d.destination(), corrupted); + let dgram = Datagram::new(d.source(), d.destination(), d.tos(), d.ttl(), corrupted); server.process_input(&dgram, now()); // The server should have received two packets, // the first should be dropped, the second saved. @@ -711,7 +711,7 @@ fn extra_initial_invalid_cid() { let mut copy = hs.to_vec(); assert_ne!(copy[5], 0); // The DCID should be non-zero length. copy[6] ^= 0xc4; - let dgram_copy = Datagram::new(hs.destination(), hs.source(), copy); + let dgram_copy = Datagram::new(hs.destination(), hs.source(), hs.tos(), hs.ttl(), copy); let nothing = client.process(Some(&dgram_copy), now).dgram(); assert!(nothing.is_none()); } @@ -806,6 +806,8 @@ fn anti_amplification() { #[cfg(not(feature = "fuzzing"))] #[test] fn garbage_initial() { + use test_fixture::datagram; + let mut client = default_client(); let mut server = default_server(); @@ -814,7 +816,7 @@ fn garbage_initial() { let mut corrupted = Vec::from(&initial[..initial.len() - 1]); corrupted.push(initial[initial.len() - 1] ^ 0xb7); corrupted.extend_from_slice(rest.as_ref().map_or(&[], |r| &r[..])); - let garbage = Datagram::new(addr(), addr(), corrupted); + let garbage = datagram(corrupted); assert_eq!(Output::None, server.process(Some(&garbage), now())); } @@ -832,6 +834,8 @@ fn drop_initial_packet_from_wrong_address() { let dgram = Datagram::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 2)), 443), p.destination(), + IpTosEcn::Ect0.into(), + 128, &p[..], ); @@ -858,6 +862,8 @@ fn drop_handshake_packet_from_wrong_address() { let dgram = Datagram::new( SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 2)), 443), p.destination(), + IpTosEcn::Ect0.into(), + 128, &p[..], ); diff --git a/neqo-transport/src/connection/tests/migration.rs b/neqo-transport/src/connection/tests/migration.rs index 7dd5e50d13..79c13faa77 100644 --- a/neqo-transport/src/connection/tests/migration.rs +++ b/neqo-transport/src/connection/tests/migration.rs @@ -30,7 +30,7 @@ use std::{ use test_fixture::{ self, addr, addr_v4, assertions::{assert_v4_path, assert_v6_path}, - fixture_init, now, + fixture_init, new_neqo_qlog, now, }; /// This should be a valid-seeming transport parameter. @@ -52,7 +52,7 @@ fn loopback() -> SocketAddr { } fn change_path(d: &Datagram, a: SocketAddr) -> Datagram { - Datagram::new(a, a, &d[..]) + Datagram::new(a, a, d.tos(), d.ttl(), &d[..]) } fn new_port(a: SocketAddr) -> SocketAddr { @@ -61,7 +61,13 @@ fn new_port(a: SocketAddr) -> SocketAddr { } fn change_source_port(d: &Datagram) -> Datagram { - Datagram::new(new_port(d.source()), d.destination(), &d[..]) + Datagram::new( + new_port(d.source()), + d.destination(), + d.tos(), + d.ttl(), + &d[..], + ) } /// As these tests use a new path, that path often has a non-zero RTT. @@ -374,7 +380,7 @@ fn migration(mut client: Connection) { let probe = client.process_output(now).dgram().unwrap(); assert_v4_path(&probe, true); // Contains PATH_CHALLENGE. assert_eq!(client.stats().frame_tx.path_challenge, 1); - let probe_cid = ConnectionId::from(&get_cid(&probe)); + let probe_cid = ConnectionId::from(get_cid(&probe)); let resp = server.process(Some(&probe), now).dgram().unwrap(); assert_v4_path(&resp, true); @@ -498,6 +504,7 @@ fn preferred_address(hs_client: SocketAddr, hs_server: SocketAddr, preferred: So }; fixture_init(); + let (log, _contents) = new_neqo_qlog(); let mut client = Connection::new_client( test_fixture::DEFAULT_SERVER_NAME, test_fixture::DEFAULT_ALPN, @@ -508,6 +515,7 @@ fn preferred_address(hs_client: SocketAddr, hs_server: SocketAddr, preferred: So now(), ) .unwrap(); + client.set_qlog(log); let spa = match preferred { SocketAddr::V6(v6) => PreferredAddress::new(None, Some(v6)), SocketAddr::V4(v4) => PreferredAddress::new(Some(v4), None), @@ -814,7 +822,7 @@ fn retire_all() { .unwrap(); connect_force_idle(&mut client, &mut server); - let original_cid = ConnectionId::from(&get_cid(&send_something(&mut client, now()))); + let original_cid = ConnectionId::from(get_cid(&send_something(&mut client, now()))); server.test_frame_writer = Some(Box::new(RetireAll { cid_gen })); let ncid = send_something(&mut server, now()); @@ -852,7 +860,7 @@ fn retire_prior_to_migration_failure() { .unwrap(); connect_force_idle(&mut client, &mut server); - let original_cid = ConnectionId::from(&get_cid(&send_something(&mut client, now()))); + let original_cid = ConnectionId::from(get_cid(&send_something(&mut client, now()))); client .migrate(Some(addr_v4()), Some(addr_v4()), false, now()) @@ -862,7 +870,7 @@ fn retire_prior_to_migration_failure() { let probe = client.process_output(now()).dgram().unwrap(); assert_v4_path(&probe, true); assert_eq!(client.stats().frame_tx.path_challenge, 1); - let probe_cid = ConnectionId::from(&get_cid(&probe)); + let probe_cid = ConnectionId::from(get_cid(&probe)); assert_ne!(original_cid, probe_cid); // Have the server receive the probe, but separately have it decide to @@ -907,7 +915,7 @@ fn retire_prior_to_migration_success() { .unwrap(); connect_force_idle(&mut client, &mut server); - let original_cid = ConnectionId::from(&get_cid(&send_something(&mut client, now()))); + let original_cid = ConnectionId::from(get_cid(&send_something(&mut client, now()))); client .migrate(Some(addr_v4()), Some(addr_v4()), false, now()) @@ -917,7 +925,7 @@ fn retire_prior_to_migration_success() { let probe = client.process_output(now()).dgram().unwrap(); assert_v4_path(&probe, true); assert_eq!(client.stats().frame_tx.path_challenge, 1); - let probe_cid = ConnectionId::from(&get_cid(&probe)); + let probe_cid = ConnectionId::from(get_cid(&probe)); assert_ne!(original_cid, probe_cid); // Have the server receive the probe, but separately have it decide to diff --git a/neqo-transport/src/connection/tests/mod.rs b/neqo-transport/src/connection/tests/mod.rs index a244efca53..b722feff78 100644 --- a/neqo-transport/src/connection/tests/mod.rs +++ b/neqo-transport/src/connection/tests/mod.rs @@ -30,7 +30,7 @@ use std::{ use neqo_common::{event::Provider, qdebug, qtrace, Datagram, Decoder, Role}; use neqo_crypto::{random, AllowZeroRtt, AuthenticationStatus, ResumptionToken}; -use test_fixture::{self, addr, fixture_init, now}; +use test_fixture::{self, addr, fixture_init, new_neqo_qlog, now}; // All the tests. mod ackrate; @@ -99,7 +99,8 @@ impl ConnectionIdGenerator for CountingConnectionIdGenerator { // These are a direct copy of those functions. pub fn new_client(params: ConnectionParameters) -> Connection { fixture_init(); - Connection::new_client( + let (log, _contents) = new_neqo_qlog(); + let mut client = Connection::new_client( test_fixture::DEFAULT_SERVER_NAME, test_fixture::DEFAULT_ALPN, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), @@ -108,15 +109,18 @@ pub fn new_client(params: ConnectionParameters) -> Connection { params, now(), ) - .expect("create a default client") + .expect("create a default client"); + client.set_qlog(log); + client } + pub fn default_client() -> Connection { new_client(ConnectionParameters::default()) } pub fn new_server(params: ConnectionParameters) -> Connection { fixture_init(); - + let (log, _contents) = new_neqo_qlog(); let mut c = Connection::new_server( test_fixture::DEFAULT_KEYS, test_fixture::DEFAULT_ALPN, @@ -124,6 +128,7 @@ pub fn new_server(params: ConnectionParameters) -> Connection { params, ) .expect("create a default server"); + c.set_qlog(log); c.server_enable_0rtt(&test_fixture::anti_replay(), AllowZeroRtt {}) .expect("enable 0-RTT"); c diff --git a/neqo-transport/src/connection/tests/vn.rs b/neqo-transport/src/connection/tests/vn.rs index 4c00253642..b7e737e9d5 100644 --- a/neqo-transport/src/connection/tests/vn.rs +++ b/neqo-transport/src/connection/tests/vn.rs @@ -13,10 +13,10 @@ use crate::packet::PACKET_BIT_LONG; use crate::tparams::{self, TransportParameter}; use crate::{ConnectionParameters, Error, Version}; -use neqo_common::{event::Provider, Datagram, Decoder, Encoder}; +use neqo_common::{event::Provider, Decoder, Encoder}; use std::mem; use std::time::Duration; -use test_fixture::{self, addr, assertions, now}; +use test_fixture::{self, assertions, datagram, now}; // The expected PTO duration after the first Initial is sent. const INITIAL_PTO: Duration = Duration::from_millis(300); @@ -29,10 +29,7 @@ fn unknown_version() { let mut unknown_version_packet = vec![0x80, 0x1a, 0x1a, 0x1a, 0x1a]; unknown_version_packet.resize(1200, 0x0); - mem::drop(client.process( - Some(&Datagram::new(addr(), addr(), unknown_version_packet)), - now(), - )); + mem::drop(client.process(Some(&datagram(unknown_version_packet)), now())); assert_eq!(1, client.stats().dropped_rx); } @@ -44,10 +41,7 @@ fn server_receive_unknown_first_packet() { unknown_version_packet.resize(1200, 0x0); assert_eq!( - server.process( - Some(&Datagram::new(addr(), addr(), unknown_version_packet,)), - now(), - ), + server.process(Some(&datagram(unknown_version_packet)), now(),), Output::None ); @@ -86,7 +80,7 @@ fn version_negotiation_current_version() { &[0x1a1a_1a1a, Version::default().wire_version()], ); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); let delay = client.process(Some(&dgram), now()).callback(); assert_eq!(delay, INITIAL_PTO); assert_eq!(*client.state(), State::WaitInitial); @@ -105,7 +99,7 @@ fn version_negotiation_version0() { let vn = create_vn(&initial_pkt, &[0, 0x1a1a_1a1a]); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); let delay = client.process(Some(&dgram), now()).callback(); assert_eq!(delay, INITIAL_PTO); assert_eq!(*client.state(), State::WaitInitial); @@ -124,7 +118,7 @@ fn version_negotiation_only_reserved() { let vn = create_vn(&initial_pkt, &[0x1a1a_1a1a, 0x2a2a_2a2a]); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); assert_eq!(client.process(Some(&dgram), now()), Output::None); match client.state() { State::Closed(err) => { @@ -146,7 +140,7 @@ fn version_negotiation_corrupted() { let vn = create_vn(&initial_pkt, &[0x1a1a_1a1a, 0x2a2a_2a2a]); - let dgram = Datagram::new(addr(), addr(), &vn[..vn.len() - 1]); + let dgram = datagram(vn[..vn.len() - 1].to_vec()); let delay = client.process(Some(&dgram), now()).callback(); assert_eq!(delay, INITIAL_PTO); assert_eq!(*client.state(), State::WaitInitial); @@ -165,7 +159,7 @@ fn version_negotiation_empty() { let vn = create_vn(&initial_pkt, &[]); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); let delay = client.process(Some(&dgram), now()).callback(); assert_eq!(delay, INITIAL_PTO); assert_eq!(*client.state(), State::WaitInitial); @@ -183,7 +177,7 @@ fn version_negotiation_not_supported() { .to_vec(); let vn = create_vn(&initial_pkt, &[0x1a1a_1a1a, 0x2a2a_2a2a, 0xff00_0001]); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); assert_eq!(client.process(Some(&dgram), now()), Output::None); match client.state() { State::Closed(err) => { @@ -206,7 +200,7 @@ fn version_negotiation_bad_cid() { initial_pkt[6] ^= 0xc4; let vn = create_vn(&initial_pkt, &[0x1a1a_1a1a, 0x2a2a_2a2a, 0xff00_0001]); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); let delay = client.process(Some(&dgram), now()).callback(); assert_eq!(delay, INITIAL_PTO); assert_eq!(*client.state(), State::WaitInitial); @@ -311,7 +305,7 @@ fn version_negotiation_downgrade() { // Start the handshake and spoof a VN packet. let initial = client.process_output(now()).dgram().unwrap(); let vn = create_vn(&initial, &[DOWNGRADE.wire_version()]); - let dgram = Datagram::new(addr(), addr(), vn); + let dgram = datagram(vn); client.process_input(&dgram, now()); connect_fail( diff --git a/neqo-transport/src/frame.rs b/neqo-transport/src/frame.rs index 7eeba507bc..4d9ec436b1 100644 --- a/neqo-transport/src/frame.rs +++ b/neqo-transport/src/frame.rs @@ -22,7 +22,7 @@ pub type FrameType = u64; const FRAME_TYPE_PADDING: FrameType = 0x0; pub const FRAME_TYPE_PING: FrameType = 0x1; pub const FRAME_TYPE_ACK: FrameType = 0x2; -const FRAME_TYPE_ACK_ECN: FrameType = 0x3; +pub const FRAME_TYPE_ACK_ECN: FrameType = 0x3; pub const FRAME_TYPE_RESET_STREAM: FrameType = 0x4; pub const FRAME_TYPE_STOP_SENDING: FrameType = 0x5; pub const FRAME_TYPE_CRYPTO: FrameType = 0x6; @@ -108,6 +108,9 @@ pub enum Frame<'a> { ack_delay: u64, first_ack_range: u64, ack_ranges: Vec, + ect0_count: u64, + ect1_count: u64, + ce_count: u64, }, ResetStream { stream_id: StreamId, @@ -215,7 +218,7 @@ impl<'a> Frame<'a> { match self { Self::Padding => FRAME_TYPE_PADDING, Self::Ping => FRAME_TYPE_PING, - Self::Ack { .. } => FRAME_TYPE_ACK, // We don't do ACK ECN. + Self::Ack { .. } => FRAME_TYPE_ACK, Self::ResetStream { .. } => FRAME_TYPE_RESET_STREAM, Self::StopSending { .. } => FRAME_TYPE_STOP_SENDING, Self::Crypto { .. } => FRAME_TYPE_CRYPTO, @@ -422,17 +425,24 @@ impl<'a> Frame<'a> { } // Now check for the values for ACK_ECN. - if t == FRAME_TYPE_ACK_ECN { - dv(dec)?; - dv(dec)?; - dv(dec)?; - } + let (ect0_count, ect1_count, ce_count) = match t { + FRAME_TYPE_ACK_ECN => { + let ect0_count = dv(dec)?; + let ect1_count = dv(dec)?; + let ce_count = dv(dec)?; + (ect0_count, ect1_count, ce_count) + } + _ => (0, 0, 0), + }; Ok(Self::Ack { largest_acknowledged: la, ack_delay: ad, first_ack_range: fa, ack_ranges: arr, + ect0_count, + ect1_count, + ce_count, }) } FRAME_TYPE_STOP_SENDING => Ok(Self::StopSending { @@ -624,7 +634,10 @@ mod tests { largest_acknowledged: 0x1234, ack_delay: 0x1235, first_ack_range: 0x1236, - ack_ranges: ar, + ack_ranges: ar.clone(), + ect0_count: 0, + ect1_count: 0, + ce_count: 0, }; just_dec(&f, "025234523502523601020304"); @@ -634,10 +647,19 @@ mod tests { let mut dec = enc.as_decoder(); assert_eq!(Frame::decode(&mut dec).unwrap_err(), Error::NoMoreData); - // Try to parse ACK_ECN without ECN values + // Try to parse ACK_ECN with ECN values + let fe = Frame::Ack { + largest_acknowledged: 0x1234, + ack_delay: 0x1235, + first_ack_range: 0x1236, + ack_ranges: ar, + ect0_count: 1, + ect1_count: 2, + ce_count: 3, + }; let enc = Encoder::from_hex("035234523502523601020304010203"); let mut dec = enc.as_decoder(); - assert_eq!(Frame::decode(&mut dec).unwrap(), f); + assert_eq!(Frame::decode(&mut dec).unwrap(), fe); } #[test] diff --git a/neqo-transport/src/packet/mod.rs b/neqo-transport/src/packet/mod.rs index b8a2d96790..acd3b5b2be 100644 --- a/neqo-transport/src/packet/mod.rs +++ b/neqo-transport/src/packet/mod.rs @@ -673,13 +673,12 @@ impl<'a> PublicPacket<'a> { self.packet_type } - pub fn dcid(&self) -> &ConnectionIdRef<'a> { - &self.dcid + pub fn dcid(&self) -> ConnectionIdRef<'a> { + self.dcid } - pub fn scid(&self) -> &ConnectionIdRef<'a> { + pub fn scid(&self) -> ConnectionIdRef<'a> { self.scid - .as_ref() .expect("should only be called for long header packets") } diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 54849eee56..cd98a805f8 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -7,29 +7,31 @@ #![deny(clippy::pedantic)] #![allow(clippy::module_name_repetitions)] -use std::cell::RefCell; -use std::convert::TryFrom; -use std::fmt::{self, Display}; -use std::mem; -use std::net::{IpAddr, SocketAddr}; -use std::rc::Rc; -use std::time::{Duration, Instant}; - -use crate::ackrate::{AckRate, PeerAckDelay}; -use crate::cc::CongestionControlAlgorithm; -use crate::cid::{ConnectionId, ConnectionIdRef, ConnectionIdStore, RemoteConnectionIdEntry}; -use crate::frame::{ - FRAME_TYPE_PATH_CHALLENGE, FRAME_TYPE_PATH_RESPONSE, FRAME_TYPE_RETIRE_CONNECTION_ID, +use std::{ + cell::RefCell, + convert::TryFrom, + fmt::{self, Display}, + mem, + net::{IpAddr, SocketAddr}, + rc::Rc, + time::{Duration, Instant}, }; -use crate::packet::PacketBuilder; -use crate::recovery::RecoveryToken; -use crate::rtt::RttEstimate; -use crate::sender::PacketSender; -use crate::stats::FrameStats; -use crate::tracking::{PacketNumberSpace, SentPacket}; -use crate::{Error, Res}; - -use neqo_common::{hex, qdebug, qinfo, qlog::NeqoQlog, qtrace, Datagram, Encoder}; + +use crate::{ + ackrate::{AckRate, PeerAckDelay}, + cc::{CongestionControlAlgorithm, MAX_DATAGRAM_SIZE}, + cid::{ConnectionId, ConnectionIdRef, ConnectionIdStore, RemoteConnectionIdEntry}, + frame::{FRAME_TYPE_PATH_CHALLENGE, FRAME_TYPE_PATH_RESPONSE, FRAME_TYPE_RETIRE_CONNECTION_ID}, + packet::PacketBuilder, + recovery::RecoveryToken, + rtt::RttEstimate, + sender::PacketSender, + stats::FrameStats, + tracking::{PacketNumberSpace, SentPacket}, + Error, Res, +}; + +use neqo_common::{hex, qdebug, qinfo, qlog::NeqoQlog, qtrace, Datagram, Encoder, IpTosEcn}; use neqo_crypto::random; /// This is the MTU that we assume when using IPv6. @@ -537,6 +539,10 @@ pub struct Path { rtt: RttEstimate, /// A packet sender for the path, which includes congestion control and a pacer. sender: PacketSender, + /// The ECN marking to use for outgoing packets on this path. + ecn: IpTosEcn, + /// The IP TTL to use for outgoing packets on this path. + ttl: u8, /// The number of bytes received on this path. /// Note that this value might saturate on a long-lived connection, @@ -544,6 +550,8 @@ pub struct Path { received_bytes: usize, /// The number of bytes sent on this path. sent_bytes: usize, + /// The number of ECN-marked bytes sent on this path that were declared lost. + lost_ecn_bytes: usize, /// For logging of events. qlog: NeqoQlog, @@ -573,7 +581,10 @@ impl Path { challenge: None, rtt: RttEstimate::default(), sender, + ecn: IpTosEcn::Ect0, + ttl: 64, received_bytes: 0, + lost_ecn_bytes: 0, sent_bytes: 0, qlog, } @@ -664,7 +675,7 @@ impl Path { /// Set the remote connection ID based on the peer's choice. /// This is only valid during the handshake. - pub fn set_remote_cid(&mut self, cid: &ConnectionIdRef) { + pub fn set_remote_cid(&mut self, cid: ConnectionIdRef) { self.remote_cid .as_mut() .unwrap() @@ -695,7 +706,7 @@ impl Path { /// Make a datagram. pub fn datagram>>(&self, payload: V) -> Datagram { - Datagram::new(self.local, self.remote, payload) + Datagram::new(self.local, self.remote, self.ecn.into(), self.ttl, payload) } /// Get local address as `SocketAddr` @@ -983,6 +994,25 @@ impl Path { self.rtt.pto(space), // Important: the base PTO, not adjusted. lost_packets, ); + + if self.ecn == IpTosEcn::Ect0 { + // If the path is currently marking outgoing packets as ECT(0), + // update the count of lost ECN-marked bytes. + self.lost_ecn_bytes += lost_packets.iter().map(|p| p.size).sum::(); + + // If we lost more than 3 MTUs worth of ECN-marked bytes, then + // disable ECN on this path. See RFC 9000, Section 13.4.2. + // This doesn't quite implement the algorithm given in RFC 9000, + // Appendix A.4, but it should be OK. (It might be worthwhile caching + // destination IP addresses for paths on which we had to disable ECN, + // in order to not persitently delay connection establishment to + // those destinations.) + if self.lost_ecn_bytes > MAX_DATAGRAM_SIZE * 3 { + qinfo!([self], "Disabling ECN on path due to excessive loss"); + self.ecn = IpTosEcn::NotEct; + } + } + if cwnd_reduced { self.rtt.update_ack_delay(self.sender.cwnd(), self.mtu()); } diff --git a/neqo-transport/src/qlog.rs b/neqo-transport/src/qlog.rs index f5ca21ca40..ed94378efd 100644 --- a/neqo-transport/src/qlog.rs +++ b/neqo-transport/src/qlog.rs @@ -19,7 +19,7 @@ use qlog::events::{ AckedRanges, ErrorSpace, MetricsUpdated, PacketDropped, PacketHeader, PacketLost, PacketReceived, PacketSent, QuicFrame, StreamType, VersionInformation, }, - Event, EventData, RawInfo, + EventData, RawInfo, }; use neqo_common::{hex, qinfo, qlog::NeqoQlog, Decoder}; @@ -37,7 +37,7 @@ use crate::{ }; pub fn connection_tparams_set(qlog: &mut NeqoQlog, tph: &TransportParametersHandler) { - qlog.add_event(|| { + qlog.add_event_data(|| { let remote = tph.remote(); let ev_data = EventData::TransportParametersSet( qlog::events::quic::TransportParametersSet { @@ -61,20 +61,26 @@ pub fn connection_tparams_set(qlog: &mut NeqoQlog, tph: &TransportParametersHand max_udp_payload_size: Some(remote.get_integer(tparams::MAX_UDP_PAYLOAD_SIZE) as u32), ack_delay_exponent: Some(remote.get_integer(tparams::ACK_DELAY_EXPONENT) as u16), max_ack_delay: Some(remote.get_integer(tparams::MAX_ACK_DELAY) as u16), - // TODO(hawkinsw@obs.cr): We do not yet handle ACTIVE_CONNECTION_ID_LIMIT in tparams yet. - active_connection_id_limit: None, + active_connection_id_limit: Some(remote.get_integer(tparams::ACTIVE_CONNECTION_ID_LIMIT) as u32), initial_max_data: Some(remote.get_integer(tparams::INITIAL_MAX_DATA)), initial_max_stream_data_bidi_local: Some(remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL)), initial_max_stream_data_bidi_remote: Some(remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE)), initial_max_stream_data_uni: Some(remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI)), initial_max_streams_bidi: Some(remote.get_integer(tparams::INITIAL_MAX_STREAMS_BIDI)), initial_max_streams_uni: Some(remote.get_integer(tparams::INITIAL_MAX_STREAMS_UNI)), - // TODO(hawkinsw@obs.cr): We do not yet handle PREFERRED_ADDRESS in tparams yet. - preferred_address: None, + preferred_address: remote.get_preferred_address().and_then(|(paddr, cid)| { + Some(qlog::events::quic::PreferredAddress { + ip_v4: paddr.ipv4()?.ip().to_string(), + ip_v6: paddr.ipv6()?.ip().to_string(), + port_v4: paddr.ipv4()?.port(), + port_v6: paddr.ipv6()?.port(), + connection_id: cid.connection_id().to_string(), + stateless_reset_token: hex(cid.reset_token()), + }) + }), }); - // This event occurs very early, so just mark the time as 0.0. - Some(Event::with_time(0.0, ev_data)) + Some(ev_data) }); } @@ -194,8 +200,8 @@ pub fn packet_sent( let mut d = Decoder::from(body); let header = PacketHeader::with_type(to_qlog_pkt_type(pt), Some(pn), None, None, None); let raw = RawInfo { - length: None, - payload_length: Some(plen as u64), + length: Some(plen as u64), + payload_length: None, data: None, }; @@ -229,18 +235,18 @@ pub fn packet_sent( }); } -pub fn packet_dropped(qlog: &mut NeqoQlog, payload: &PublicPacket) { +pub fn packet_dropped(qlog: &mut NeqoQlog, public_packet: &PublicPacket) { qlog.add_event_data(|| { let header = PacketHeader::with_type( - to_qlog_pkt_type(payload.packet_type()), + to_qlog_pkt_type(public_packet.packet_type()), None, None, None, None, ); let raw = RawInfo { - length: None, - payload_length: Some(payload.len() as u64), + length: Some(public_packet.len() as u64), + payload_length: None, data: None, }; @@ -290,8 +296,8 @@ pub fn packet_received( None, ); let raw = RawInfo { - length: None, - payload_length: Some(public_packet.len() as u64), + length: Some(public_packet.len() as u64), + payload_length: None, data: None, }; @@ -400,6 +406,9 @@ fn frame_to_qlogframe(frame: &Frame) -> QuicFrame { ack_delay, first_ack_range, ack_ranges, + ect0_count, + ect1_count, + ce_count, } => { let ranges = Frame::decode_ack_frame(*largest_acknowledged, *first_ack_range, ack_ranges).ok(); @@ -415,9 +424,9 @@ fn frame_to_qlogframe(frame: &Frame) -> QuicFrame { QuicFrame::Ack { ack_delay: Some(*ack_delay as f32 / 1000.0), acked_ranges, - ect1: None, - ect0: None, - ce: None, + ect1: Some(*ect0_count), + ect0: Some(*ect1_count), + ce: Some(*ce_count), } } Frame::ResetStream { diff --git a/neqo-transport/src/recovery.rs b/neqo-transport/src/recovery.rs index 23c296949d..0ea53a3309 100644 --- a/neqo-transport/src/recovery.rs +++ b/neqo-transport/src/recovery.rs @@ -412,7 +412,7 @@ impl LossRecoverySpace { .sent_packets .iter_mut() // BTreeMap iterates in order of ascending PN - .take_while(|(&k, _)| Some(k) < largest_acked) + .take_while(|(&k, _)| largest_acked.is_none() || Some(k) < largest_acked) { // Packets sent before now - loss_delay are deemed lost. if packet.time_sent + loss_delay <= now { @@ -430,7 +430,9 @@ impl LossRecoverySpace { largest_acked ); } else { - self.first_ooo_time = Some(packet.time_sent); + if largest_acked.is_some() { + self.first_ooo_time = Some(packet.time_sent); + } // No more packets can be declared lost after this one. break; }; diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 277f10e876..52c2be0c94 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -17,21 +17,25 @@ use neqo_crypto::{ use qlog::streamer::QlogStreamer; pub use crate::addr_valid::ValidateAddress; -use crate::addr_valid::{AddressValidation, AddressValidationResult}; -use crate::cid::{ConnectionId, ConnectionIdDecoder, ConnectionIdGenerator, ConnectionIdRef}; -use crate::connection::{Connection, Output, State}; -use crate::packet::{PacketBuilder, PacketType, PublicPacket}; -use crate::{ConnectionParameters, Res, Version}; - -use std::cell::RefCell; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::fs::OpenOptions; -use std::mem; -use std::net::SocketAddr; -use std::ops::{Deref, DerefMut}; -use std::path::PathBuf; -use std::rc::{Rc, Weak}; -use std::time::{Duration, Instant}; +use crate::{ + addr_valid::{AddressValidation, AddressValidationResult}, + cid::{ConnectionId, ConnectionIdDecoder, ConnectionIdGenerator, ConnectionIdRef}, + connection::{Connection, Output, State}, + packet::{PacketBuilder, PacketType, PublicPacket}, + ConnectionParameters, Res, Version, +}; + +use std::{ + cell::RefCell, + collections::{HashMap, HashSet, VecDeque}, + fs::OpenOptions, + mem, + net::SocketAddr, + ops::{Deref, DerefMut}, + path::PathBuf, + rc::{Rc, Weak}, + time::{Duration, Instant}, +}; pub enum InitialResult { Accept, @@ -303,7 +307,7 @@ impl Server { out.dgram() } - fn connection(&self, cid: &ConnectionIdRef) -> Option { + fn connection(&self, cid: ConnectionIdRef) -> Option { self.connections.borrow().get(&cid[..]).map(Rc::clone) } @@ -345,7 +349,13 @@ impl Server { &initial.dst_cid, ); if let Ok(p) = packet { - let retry = Datagram::new(dgram.destination(), dgram.source(), p); + let retry = Datagram::new( + dgram.destination(), + dgram.source(), + dgram.tos(), + dgram.ttl(), + p, + ); Some(retry) } else { qerror!([self], "unable to encode retry, dropping packet"); @@ -383,7 +393,7 @@ impl Server { } } - fn create_qlog_trace(&self, odcid: &ConnectionIdRef<'_>) -> NeqoQlog { + fn create_qlog_trace(&self, odcid: ConnectionIdRef<'_>) -> NeqoQlog { if let Some(qlog_dir) = &self.qlog_dir { let mut qlog_path = qlog_dir.to_path_buf(); @@ -449,7 +459,7 @@ impl Server { c.set_retry_cids(odcid, initial.src_cid, initial.dst_cid); } c.set_validation(Rc::clone(&self.address_validation)); - c.set_qlog(self.create_qlog_trace(&attempt_key.odcid.as_cid_ref())); + c.set_qlog(self.create_qlog_trace(attempt_key.odcid.as_cid_ref())); if let Some(cfg) = &self.ech_config { if c.server_enable_ech(cfg.config, &cfg.public_name, &cfg.sk, &cfg.pk) .is_err() @@ -504,7 +514,7 @@ impl Server { qwarn!([self], "Unable to create connection"); if e == crate::Error::VersionNegotiation { crate::qlog::server_version_information_failed( - &mut self.create_qlog_trace(&attempt_key.odcid.as_cid_ref()), + &mut self.create_qlog_trace(attempt_key.odcid.as_cid_ref()), self.conn_params.get_versions().all(), initial.version.wire_version(), ) @@ -578,8 +588,8 @@ impl Server { qdebug!([self], "Unsupported version: {:x}", packet.wire_version()); let vn = PacketBuilder::version_negotiation( - packet.scid(), - packet.dcid(), + &packet.scid()[..], + &packet.dcid()[..], packet.wire_version(), self.conn_params.get_versions().all(), ); @@ -590,7 +600,13 @@ impl Server { packet.wire_version(), ); - return Some(Datagram::new(dgram.destination(), dgram.source(), vn)); + return Some(Datagram::new( + dgram.destination(), + dgram.source(), + dgram.tos(), + dgram.ttl(), + vn, + )); } match packet.packet_type() { diff --git a/neqo-transport/src/tracking.rs b/neqo-transport/src/tracking.rs index 32f1c8d1b7..9ace73760f 100644 --- a/neqo-transport/src/tracking.rs +++ b/neqo-transport/src/tracking.rs @@ -16,20 +16,22 @@ use std::{ time::{Duration, Instant}, }; -use neqo_common::{qdebug, qinfo, qtrace, qwarn}; +use neqo_common::{qdebug, qinfo, qtrace, qwarn, IpTosEcn}; use neqo_crypto::{Epoch, TLS_EPOCH_HANDSHAKE, TLS_EPOCH_INITIAL}; use crate::{ + frame::{FRAME_TYPE_ACK, FRAME_TYPE_ACK_ECN}, packet::{PacketBuilder, PacketNumber, PacketType}, recovery::RecoveryToken, stats::FrameStats, Error, Res, }; +use enum_map::{Enum, EnumMap}; use smallvec::{smallvec, SmallVec}; // TODO(mt) look at enabling EnumMap for this: https://stackoverflow.com/a/44905797/1375574 -#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Enum)] pub enum PacketNumberSpace { Initial, Handshake, @@ -382,6 +384,8 @@ pub struct RecvdPackets { /// Whether we are ignoring packets that arrive out of order /// for the purposes of generating immediate acknowledgment. ignore_order: bool, + /// The counts of different ECN marks that have been received. + ecn_count: EnumMap, } impl RecvdPackets { @@ -399,9 +403,15 @@ impl RecvdPackets { unacknowledged_count: 0, unacknowledged_tolerance: DEFAULT_ACK_PACKET_TOLERANCE, ignore_order: false, + ecn_count: EnumMap::default(), } } + /// Increase the ECN count for the given mark by one. + pub fn inc_ecn_count(&mut self, ecn: IpTosEcn) { + self.ecn_count[ecn] += 1; + } + /// Get the time at which the next ACK should be sent. pub fn ack_time(&self) -> Option { self.ack_time @@ -550,7 +560,7 @@ impl RecvdPackets { } } - /// Generate an ACK frame for this packet number space. + /// Generate an `ACK` or `ACK_ECN` frame for this packet number space. /// /// Unlike other frame generators this doesn't modify the underlying instance /// to track what has been sent. This only clears the delayed ACK timer. @@ -598,7 +608,15 @@ impl RecvdPackets { .cloned() .collect::>(); - builder.encode_varint(crate::frame::FRAME_TYPE_ACK); + let have_ecn_counts = self.ecn_count[IpTosEcn::Ect0] > 0 + || self.ecn_count[IpTosEcn::Ect1] > 0 + || self.ecn_count[IpTosEcn::Ce] > 0; + + builder.encode_varint(if have_ecn_counts { + FRAME_TYPE_ACK_ECN + } else { + FRAME_TYPE_ACK + }); let mut iter = ranges.iter(); let Some(first) = iter.next() else { return }; builder.encode_varint(first.largest); @@ -622,6 +640,12 @@ impl RecvdPackets { last = r.smallest; } + if have_ecn_counts { + builder.encode_varint(self.ecn_count[IpTosEcn::Ect0]); + builder.encode_varint(self.ecn_count[IpTosEcn::Ect1]); + builder.encode_varint(self.ecn_count[IpTosEcn::Ce]); + } + // We've sent an ACK, reset the timer. self.ack_time = None; self.last_ack_time = Some(now); diff --git a/neqo-transport/tests/conn_vectors.rs b/neqo-transport/tests/conn_vectors.rs index f088ebea3f..7597c81621 100644 --- a/neqo-transport/tests/conn_vectors.rs +++ b/neqo-transport/tests/conn_vectors.rs @@ -8,11 +8,10 @@ #![deny(clippy::pedantic)] #![cfg(not(feature = "fuzzing"))] -use neqo_common::Datagram; use neqo_transport::{ Connection, ConnectionParameters, RandomConnectionIdGenerator, State, Version, }; -use test_fixture::{self, addr, now}; +use test_fixture::{self, datagram, now}; use std::cell::RefCell; use std::rc::Rc; @@ -265,7 +264,7 @@ fn make_server(v: Version) -> Connection { fn process_client_initial(v: Version, packet: &[u8]) { let mut server = make_server(v); - let dgram = Datagram::new(addr(), addr(), packet); + let dgram = datagram(packet.to_vec()); assert_eq!(*server.state(), State::Init); let out = server.process(Some(&dgram), now()); assert_eq!(*server.state(), State::Handshaking); diff --git a/neqo-transport/tests/connection.rs b/neqo-transport/tests/connection.rs index 13c70590fa..b625751e6e 100644 --- a/neqo-transport/tests/connection.rs +++ b/neqo-transport/tests/connection.rs @@ -39,6 +39,8 @@ fn truncate_long_packet() { let truncated = Datagram::new( dupe.source(), dupe.destination(), + dupe.tos(), + dupe.ttl(), &dupe[..(dupe.len() - tail)], ); let hs_probe = client.process(Some(&truncated), now()).dgram(); @@ -60,8 +62,8 @@ fn truncate_long_packet() { /// Test that reordering parts of the server Initial doesn't change things. #[test] fn reorder_server_initial() { - // A simple ACK frame for a single packet with packet number 0. - const ACK_FRAME: &[u8] = &[0x02, 0x00, 0x00, 0x00, 0x00]; + // A simple ACK_ECN frame for a single packet with packet number 0 with a single ECT(0) mark. + const ACK_FRAME: &[u8] = &[0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00]; let mut client = new_client( ConnectionParameters::default().versions(Version::Version1, vec![Version::Version1]), @@ -108,6 +110,8 @@ fn reorder_server_initial() { let reordered = Datagram::new( server_initial.source(), server_initial.destination(), + server_initial.tos(), + server_initial.ttl(), packet, ); @@ -182,6 +186,8 @@ fn overflow_crypto() { let dgram = Datagram::new( server_initial.source(), server_initial.destination(), + server_initial.tos(), + server_initial.ttl(), packet, ); client.process_input(&dgram, now()); diff --git a/neqo-transport/tests/retry.rs b/neqo-transport/tests/retry.rs index 0b51eacab1..3fffcba3da 100644 --- a/neqo-transport/tests/retry.rs +++ b/neqo-transport/tests/retry.rs @@ -21,7 +21,7 @@ use std::convert::TryFrom; use std::mem; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::time::Duration; -use test_fixture::{self, addr, assertions, default_client, now, split_datagram}; +use test_fixture::{self, assertions, datagram, default_client, now, split_datagram}; #[test] fn retry_basic() { @@ -150,7 +150,13 @@ fn retry_different_ip() { let dgram = dgram.unwrap(); let other_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)); let other_addr = SocketAddr::new(other_v4, 443); - let from_other = Datagram::new(other_addr, dgram.destination(), &dgram[..]); + let from_other = Datagram::new( + other_addr, + dgram.destination(), + dgram.tos(), + dgram.ttl(), + &dgram[..], + ); let dgram = server.process(Some(&from_other), now()).dgram(); assert!(dgram.is_none()); } @@ -171,7 +177,13 @@ fn new_token_different_ip() { // Now rewrite the source address. let d = dgram.unwrap(); let src = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), d.source().port()); - let dgram = Some(Datagram::new(src, d.destination(), &d[..])); + let dgram = Some(Datagram::new( + src, + d.destination(), + d.tos(), + d.ttl(), + &d[..], + )); let dgram = server.process(dgram.as_ref(), now()).dgram(); // Retry assert!(dgram.is_some()); assertions::assert_retry(dgram.as_ref().unwrap()); @@ -196,7 +208,13 @@ fn new_token_expired() { let the_future = now() + Duration::from_secs(60 * 60 * 24 * 30); let d = dgram.unwrap(); let src = SocketAddr::new(d.source().ip(), d.source().port() + 1); - let dgram = Some(Datagram::new(src, d.destination(), &d[..])); + let dgram = Some(Datagram::new( + src, + d.destination(), + d.tos(), + d.ttl(), + &d[..], + )); let dgram = server.process(dgram.as_ref(), the_future).dgram(); // Retry assert!(dgram.is_some()); assertions::assert_retry(dgram.as_ref().unwrap()); @@ -257,7 +275,13 @@ fn retry_bad_integrity() { let mut tweaked = retry.to_vec(); tweaked[retry.len() - 1] ^= 0x45; // damage the auth tag - let tweaked_packet = Datagram::new(retry.source(), retry.destination(), tweaked); + let tweaked_packet = Datagram::new( + retry.source(), + retry.destination(), + retry.tos(), + retry.ttl(), + tweaked, + ); // The client should ignore this packet. let dgram = client.process(Some(&tweaked_packet), now()).dgram(); @@ -338,7 +362,7 @@ fn vn_after_retry() { encoder.encode_vec(1, &client.odcid().unwrap()[..]); encoder.encode_vec(1, &[]); encoder.encode_uint(4, 0x5a5a_6a6a_u64); - let vn = Datagram::new(addr(), addr(), encoder); + let vn = datagram(encoder.into()); assert_ne!( client.process(Some(&vn), now()).callback(), @@ -425,6 +449,8 @@ fn mitm_retry() { let new_datagram = Datagram::new( client_initial2.source(), client_initial2.destination(), + client_initial2.tos(), + client_initial2.ttl(), notoken_packet, ); qdebug!("passing modified Initial to the main server"); diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index a4f07def87..2f1ee3b493 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -23,7 +23,7 @@ use neqo_transport::{ Connection, ConnectionError, ConnectionParameters, Error, Output, State, StreamType, Version, }; use test_fixture::{ - self, assertions, default_client, new_client, now, split_datagram, + self, assertions, datagram, default_client, new_client, now, split_datagram, CountingConnectionIdGenerator, }; @@ -157,6 +157,8 @@ fn duplicate_initial_new_path() { let other = Datagram::new( SocketAddr::new(initial.source().ip(), initial.source().port() ^ 23), initial.destination(), + initial.tos(), + initial.ttl(), &initial[..], ); @@ -235,7 +237,7 @@ fn drop_non_initial() { let mut bogus_data: Vec = header.into(); bogus_data.resize(1200, 66); - let bogus = Datagram::new(test_fixture::addr(), test_fixture::addr(), bogus_data); + let bogus = datagram(bogus_data); assert!(server.process(Some(&bogus), now()).dgram().is_none()); } @@ -254,7 +256,7 @@ fn drop_short_initial() { let mut bogus_data: Vec = header.into(); bogus_data.resize(1199, 66); - let bogus = Datagram::new(test_fixture::addr(), test_fixture::addr(), bogus_data); + let bogus = datagram(bogus_data); assert!(server.process(Some(&bogus), now()).dgram().is_none()); } @@ -371,7 +373,13 @@ fn new_token_different_port() { // Now rewrite the source port, which should not change that the token is OK. let d = dgram.unwrap(); let src = SocketAddr::new(d.source().ip(), d.source().port() + 1); - let dgram = Some(Datagram::new(src, d.destination(), &d[..])); + let dgram = Some(Datagram::new( + src, + d.destination(), + d.tos(), + d.ttl(), + &d[..], + )); let dgram = server.process(dgram.as_ref(), now()).dgram(); // Retry assert!(dgram.is_some()); assertions::assert_initial(dgram.as_ref().unwrap(), false); @@ -426,7 +434,13 @@ fn bad_client_initial() { &mut ciphertext, (header_enc.len() - 1)..header_enc.len(), ); - let bad_dgram = Datagram::new(dgram.source(), dgram.destination(), ciphertext); + let bad_dgram = Datagram::new( + dgram.source(), + dgram.destination(), + dgram.tos(), + dgram.ttl(), + ciphertext, + ); // The server should reject this. let response = server.process(Some(&bad_dgram), now()); @@ -474,7 +488,13 @@ fn version_negotiation_ignored() { let dgram = client.process(None, now()).dgram().expect("a datagram"); let mut input = dgram.to_vec(); input[1] ^= 0x12; - let damaged = Datagram::new(dgram.source(), dgram.destination(), input.clone()); + let damaged = Datagram::new( + dgram.source(), + dgram.destination(), + dgram.tos(), + dgram.ttl(), + input.clone(), + ); let vn = server.process(Some(&damaged), now()).dgram(); let mut dec = Decoder::from(&input[5..]); // Skip past version. diff --git a/qns/.dockerignore b/qns/.dockerignore index acdb180198..2f10ed7b44 100644 --- a/qns/.dockerignore +++ b/qns/.dockerignore @@ -1 +1,2 @@ .last-update-* +/target/ diff --git a/qns/Dockerfile b/qns/Dockerfile index d8da71b6b5..051cf5b8a5 100644 --- a/qns/Dockerfile +++ b/qns/Dockerfile @@ -1,8 +1,5 @@ FROM martenseemann/quic-network-simulator-endpoint:latest AS buildimage -# Which branch to build from. -ARG NEQO_BRANCH=main - RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates coreutils curl git make mercurial ssh \ build-essential clang llvm libclang-dev lld \ @@ -10,7 +7,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && apt-get autoremove -y && apt-get clean -y \ && rm -rf /var/lib/apt/lists/* -ARG RUST_VERSION +ARG RUST_VERSION=stable ENV RUSTUP_HOME=/usr/local/rustup \ CARGO_HOME=/usr/local/cargo \ diff --git a/test-fixture/Cargo.toml b/test-fixture/Cargo.toml index 99bdd41cb5..dddabfbc2d 100644 --- a/test-fixture/Cargo.toml +++ b/test-fixture/Cargo.toml @@ -7,13 +7,14 @@ rust-version = "1.70.0" license = "MIT OR Apache-2.0" [dependencies] +lazy_static = "1.3.0" +log = {version = "0.4.0", default-features = false} neqo-common = { path = "../neqo-common" } neqo-crypto = { path = "../neqo-crypto" } neqo-http3 = { path = "../neqo-http3" } neqo-qpack = { path = "../neqo-qpack" } neqo-transport = { path = "../neqo-transport" } -log = {version = "0.4.0", default-features = false} -lazy_static = "1.3.0" +qlog = "0.11.0" [features] deny-warnings = [] diff --git a/test-fixture/src/lib.rs b/test-fixture/src/lib.rs index 5ddba24814..99356eeeda 100644 --- a/test-fixture/src/lib.rs +++ b/test-fixture/src/lib.rs @@ -7,7 +7,12 @@ #![cfg_attr(feature = "deny-warnings", deny(warnings))] #![warn(clippy::pedantic)] -use neqo_common::{event::Provider, hex, qtrace, Datagram, Decoder}; +use neqo_common::{ + event::Provider, + hex, + qlog::{new_trace, NeqoQlog}, + qtrace, Datagram, Decoder, IpTosEcn, Role, +}; use neqo_crypto::{init_db, random, AllowZeroRtt, AntiReplay, AuthenticationStatus}; use neqo_http3::{Http3Client, Http3Parameters, Http3Server}; @@ -16,13 +21,17 @@ use neqo_transport::{ ConnectionIdGenerator, ConnectionIdRef, ConnectionParameters, State, Version, }; +use qlog::{events::EventImportance, streamer::QlogStreamer}; + use std::{ cell::RefCell, cmp::max, convert::TryFrom, + io::{Cursor, Result, Write}, mem, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, rc::Rc, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; @@ -84,6 +93,12 @@ pub fn addr() -> SocketAddr { SocketAddr::new(v6ip, 443) } +/// Create a default datagram with the given data. +#[must_use] +pub fn datagram(data: Vec) -> Datagram { + Datagram::new(addr(), addr(), IpTosEcn::Ect0.into(), 128, data) +} + /// An IPv4 version of the default socket address. #[must_use] pub fn addr_v4() -> SocketAddr { @@ -130,7 +145,8 @@ impl ConnectionIdGenerator for CountingConnectionIdGenerator { #[must_use] pub fn new_client(params: ConnectionParameters) -> Connection { fixture_init(); - Connection::new_client( + let (log, _contents) = new_neqo_qlog(); + let mut client = Connection::new_client( DEFAULT_SERVER_NAME, DEFAULT_ALPN, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), @@ -139,7 +155,9 @@ pub fn new_client(params: ConnectionParameters) -> Connection { params.ack_ratio(255), // Tests work better with this set this way. now(), ) - .expect("create a client") + .expect("create a client"); + client.set_qlog(log); + client } /// Create a transport client with default configuration. @@ -166,7 +184,7 @@ pub fn default_server_h3() -> Connection { #[must_use] pub fn new_server(alpn: &[impl AsRef], params: ConnectionParameters) -> Connection { fixture_init(); - + let (log, _contents) = new_neqo_qlog(); let mut c = Connection::new_server( DEFAULT_KEYS, alpn, @@ -174,6 +192,7 @@ pub fn new_server(alpn: &[impl AsRef], params: ConnectionParameters) -> Con params.ack_ratio(255), ) .expect("create a server"); + c.set_qlog(log); c.server_enable_0rtt(&anti_replay(), AllowZeroRtt {}) .expect("enable 0-RTT"); c @@ -319,7 +338,60 @@ fn split_packet(buf: &[u8]) -> (&[u8], Option<&[u8]>) { pub fn split_datagram(d: &Datagram) -> (Datagram, Option) { let (a, b) = split_packet(&d[..]); ( - Datagram::new(d.source(), d.destination(), a), - b.map(|b| Datagram::new(d.source(), d.destination(), b)), + Datagram::new(d.source(), d.destination(), d.tos(), d.ttl(), a), + b.map(|b| Datagram::new(d.source(), d.destination(), d.tos(), d.ttl(), b)), ) } + +#[derive(Clone)] +pub struct SharedVec { + buf: Arc>>>, +} + +impl Write for SharedVec { + fn write(&mut self, buf: &[u8]) -> Result { + self.buf.lock().unwrap().write(buf) + } + fn flush(&mut self) -> Result<()> { + self.buf.lock().unwrap().flush() + } +} + +impl ToString for SharedVec { + fn to_string(&self) -> String { + String::from_utf8(self.buf.lock().unwrap().clone().into_inner()).unwrap() + } +} + +/// Returns a pair of new enabled `NeqoQlog` that is backed by a Vec together with a +/// `Cursor>` that can be used to read the contents of the log. +/// # Panics +/// Panics if the log cannot be created. +#[must_use] +pub fn new_neqo_qlog() -> (NeqoQlog, SharedVec) { + let mut trace = new_trace(Role::Client); + // Set reference time to 0.0 for testing. + trace.common_fields.as_mut().unwrap().reference_time = Some(0.0); + let buf = SharedVec { + buf: Arc::default(), + }; + let contents = buf.clone(); + let streamer = QlogStreamer::new( + qlog::QLOG_VERSION.to_string(), + None, + None, + None, + std::time::Instant::now(), + trace, + EventImportance::Base, + Box::new(buf), + ); + let log = NeqoQlog::enabled(streamer, ""); + (log.expect("to be able to write to new log"), contents) +} + +pub const EXPECTED_LOG_HEADER: &str = concat!( + "\u{1e}", + r#"{"qlog_version":"0.3","qlog_format":"JSON-SEQ","trace":{"vantage_point":{"name":"neqo-Client","type":"client"},"title":"neqo-Client trace","description":"Example qlog trace description","configuration":{"time_offset":0.0},"common_fields":{"reference_time":0.0,"time_format":"relative"}}}"#, + "\n" +);