diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index 9e152aa587..d3f7acb529 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -49,6 +49,9 @@ hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] } linkerd-app-test = { path = "../test" } arbitrary = { version = "1", features = ["derive"] } libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] } +linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ + "test-util", +] } [dev-dependencies] hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] } diff --git a/linkerd/app/inbound/fuzz/Cargo.toml b/linkerd/app/inbound/fuzz/Cargo.toml index 9a49887f91..342d6429af 100644 --- a/linkerd/app/inbound/fuzz/Cargo.toml +++ b/linkerd/app/inbound/fuzz/Cargo.toml @@ -1,4 +1,3 @@ - [package] name = "linkerd-app-inbound-fuzz" version = "0.0.0" diff --git a/linkerd/app/inbound/src/http.rs b/linkerd/app/inbound/src/http.rs index 3de8c136d2..855c122ad0 100644 --- a/linkerd/app/inbound/src/http.rs +++ b/linkerd/app/inbound/src/http.rs @@ -18,7 +18,7 @@ pub mod fuzz { test_util::{support::connect::Connect, *}, Config, Inbound, }; - use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response}; + use hyper::{Body, Request, Response}; use libfuzzer_sys::arbitrary::Arbitrary; use linkerd_app_core::{ identity, io, @@ -41,9 +41,8 @@ pub mod fuzz { } pub async fn fuzz_entry_raw(requests: Vec) { - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); - let mut client = ClientBuilder::new(); + let server = hyper::server::conn::http1::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let connect = support::connect().endpoint_fn_boxed(Target::addr(), hello_fuzz_server(server)); let profiles = profile::resolver(); @@ -55,7 +54,7 @@ pub mod fuzz { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_fuzz_server(cfg, rt, profiles, connect).new_service(Target::HTTP1); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Now send all of the requests for inp in requests.iter() { @@ -74,14 +73,7 @@ pub mod fuzz { .header(header_name, header_value) .body(Body::default()) { - let rsp = client - .ready() - .await - .expect("HTTP client poll_ready failed") - .call(req) - .await - .expect("HTTP client request failed"); - tracing::info!(?rsp); + let rsp = client.send_request(req).await; tracing::info!(?rsp); if let Ok(rsp) = rsp { let body = http_util::body_to_string(rsp.into_body()).await; @@ -93,18 +85,18 @@ pub mod fuzz { } } - drop(client); // It's okay if the background task returns an error, as this would // indicate that the proxy closed the connection --- which it will do on // invalid inputs. We want to ensure that the proxy doesn't crash in the // face of these inputs, and the background task will panic in this // case. - let res = bg.await; + drop(client); + let res = bg.join_all().await; tracing::info!(?res, "background tasks completed") } fn hello_fuzz_server( - http: hyper::server::conn::Http, + http: hyper::server::conn::http1::Builder, ) -> impl Fn(Remote) -> io::Result { move |_endpoint| { let (client_io, server_io) = support::io::duplex(4096); @@ -235,6 +227,9 @@ pub mod fuzz { kind: "server".into(), name: "testsrv".into(), }), + local_rate_limit: Arc::new( + linkerd_proxy_server_policy::LocalRateLimit::default(), + ), }, ); policy diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 524971713d..3a86df8fde 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -48,8 +48,7 @@ where #[tokio::test(flavor = "current_thread")] async fn unmeshed_http1_hello_world() { let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -64,7 +63,7 @@ async fn unmeshed_http1_hello_world() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; let req = Request::builder() .method(http::Method::GET) @@ -72,7 +71,7 @@ async fn unmeshed_http1_hello_world() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -81,6 +80,7 @@ async fn unmeshed_http1_hello_world() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -92,9 +92,7 @@ async fn unmeshed_http1_hello_world() { async fn downgrade_origin_form() { // Reproduces https://github.com/linkerd/linkerd2/issues/5298 let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let client = hyper::client::conn::http2::Builder::new(TracingExecutor); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -109,7 +107,35 @@ async fn downgrade_origin_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = { + tracing::info!(settings = ?client, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server.oneshot(server_io).await?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) + }; let req = Request::builder() .method(http::Method::GET) @@ -119,7 +145,7 @@ async fn downgrade_origin_form() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -128,6 +154,7 @@ async fn downgrade_origin_form() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -137,10 +164,8 @@ async fn downgrade_origin_form() { #[tokio::test(flavor = "current_thread")] async fn downgrade_absolute_form() { + let client = hyper::client::conn::http2::Builder::new(TracingExecutor); let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -155,7 +180,36 @@ async fn downgrade_absolute_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + + let (mut client, bg) = { + tracing::info!(settings = ?client, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server.oneshot(server_io).await?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) + }; let req = Request::builder() .method(http::Method::GET) @@ -165,7 +219,7 @@ async fn downgrade_absolute_form() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -174,6 +228,7 @@ async fn downgrade_absolute_form() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -190,8 +245,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { // Build a client using the connect that always errors so that responses // are BAD_GATEWAY. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -199,7 +253,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -221,6 +275,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { check_error_header(rsp.headers(), "server is not listening"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -237,8 +292,7 @@ async fn http1_bad_gateway_unmeshed_response() { // Build a client using the connect that always errors so that responses // are BAD_GATEWAY. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -246,7 +300,7 @@ async fn http1_bad_gateway_unmeshed_response() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -256,7 +310,7 @@ async fn http1_bad_gateway_unmeshed_response() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -267,6 +321,7 @@ async fn http1_bad_gateway_unmeshed_response() { ); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -285,8 +340,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -294,7 +348,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -304,7 +358,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -317,6 +371,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { check_error_header(rsp.headers(), "connect timed out after 1s"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -335,8 +390,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -344,7 +398,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -354,7 +408,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -365,6 +419,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { ); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index f675263b27..935907b895 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -13,12 +13,11 @@ type BoxServer = svc::BoxTcp; /// /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and /// await a response, and (2) a [`JoinSet`] running background tasks. -#[allow(deprecated)] // linkerd/linkerd2#8733 -pub async fn connect_and_accept( - client_settings: &mut hyper::client::conn::Builder, +pub async fn connect_and_accept_http1( + client_settings: &mut hyper::client::conn::http1::Builder, server: BoxServer, ) -> ( - hyper::client::conn::SendRequest, + hyper::client::conn::http1::SendRequest, JoinSet>, ) { tracing::info!(settings = ?client_settings, "connecting client with");