From 9796b407ac102d79cc4dc237bc2c2911475d7d83 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 01/10] refactor(app/inbound): remove unused `Http` parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this was not being flagged as an unused variable, due to the `#[instrument]` attribute. (😉 _it's used as a field in the generated span!_) `connect_timeout(..)` doesn't use its parameter. to address some deprecations, and avoid the need for polymorphism / refactoring related to http/1 and http/2 connections being represented as distinct types in the hyper 1.0 api, we remove it. Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http/tests.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 77cc109ff6..69534fc15f 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -262,9 +262,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { // Build a mock connect that sleeps longer than the default inbound // connect timeout. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let server = hyper::server::conn::Http::new(); - let connect = support::connect().endpoint(Target::addr(), connect_timeout(server)); + let connect = support::connect().endpoint(Target::addr(), connect_timeout()); // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. @@ -309,9 +307,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { // Build a mock connect that sleeps longer than the default inbound // connect timeout. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let server = hyper::server::conn::Http::new(); - let connect = support::connect().endpoint(Target::addr(), connect_timeout(server)); + let connect = support::connect().endpoint(Target::addr(), connect_timeout()); // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. @@ -675,10 +671,7 @@ fn connect_error() -> impl Fn(Remote) -> io::Result { } #[tracing::instrument] -#[allow(deprecated)] // linkerd/linkerd2#8733 -fn connect_timeout( - http: hyper::server::conn::Http, -) -> Box) -> ConnectFuture + Send> { +fn connect_timeout() -> Box) -> ConnectFuture + Send> { Box::new(move |endpoint| { let span = tracing::info_span!("connect_timeout", ?endpoint); Box::pin( From 717b678ee97ae16c362bb71f74f16bc7cc3d7234 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 02/10] chore(app/inbound): address `server::conn::Http` deprecations this addresses hyper 1.0 deprecations in the server side of the inbound proxy's http unit test suite logic. see for more information. the client end of this change ends up being slightly involved, due to changes that will need to be made in `linkerd_app_test::http_util`. accordingly, those deprecations will be addressed in a subsequent commit. Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http/tests.rs | 70 +++++++++++++-------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 69534fc15f..7f49abedc5 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -12,7 +12,7 @@ use linkerd_app_core::{ errors::respond::L5D_PROXY_ERROR, identity, io, metrics, proxy::http, - svc::{self, NewService, Param}, + svc::{self, http::TracingExecutor, NewService, Param}, tls, transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr}, NameAddr, ProxyRuntime, @@ -47,9 +47,7 @@ where #[tokio::test(flavor = "current_thread")] async fn unmeshed_http1_hello_world() { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); + let server = hyper::server::conn::http1::Builder::new(); #[allow(deprecated)] // linkerd/linkerd2#8733 let mut client = hyper::client::conn::Builder::new(); let _trace = trace_init(); @@ -88,9 +86,7 @@ async fn unmeshed_http1_hello_world() { #[tokio::test(flavor = "current_thread")] async fn downgrade_origin_form() { // Reproduces https://github.com/linkerd/linkerd2/issues/5298 - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); + let server = hyper::server::conn::http1::Builder::new(); #[allow(deprecated)] // linkerd/linkerd2#8733 let mut client = hyper::client::conn::Builder::new(); client.http2_only(true); @@ -131,9 +127,7 @@ async fn downgrade_origin_form() { #[tokio::test(flavor = "current_thread")] async fn downgrade_absolute_form() { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); + let server = hyper::server::conn::http1::Builder::new(); #[allow(deprecated)] // linkerd/linkerd2#8733 let mut client = hyper::client::conn::Builder::new(); client.http2_only(true); @@ -523,9 +517,7 @@ async fn grpc_response_class() { // Build a mock connector serves a gRPC server that returns errors. let connect = { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut server = hyper::server::conn::Http::new(); - server.http2_only(true); + let server = hyper::server::conn::http2::Builder::new(TracingExecutor); support::connect().endpoint_fn_boxed( Target::addr(), grpc_status_server(server, tonic::Code::Unknown), @@ -602,9 +594,8 @@ async fn grpc_response_class() { } #[tracing::instrument] -#[allow(deprecated)] // linkerd/linkerd2#8733 fn hello_server( - http: hyper::server::conn::Http, + server: hyper::server::conn::http1::Builder, ) -> impl Fn(Remote) -> io::Result { move |endpoint| { let span = tracing::info_span!("hello_server", ?endpoint); @@ -616,7 +607,8 @@ fn hello_server( Ok::<_, io::Error>(Response::new(Body::from("Hello world!"))) }); tokio::spawn( - http.serve_connection(server_io, hello_svc) + server + .serve_connection(server_io, hello_svc) .in_current_span(), ); Ok(io::BoxedIo::new(client_io)) @@ -626,7 +618,7 @@ fn hello_server( #[tracing::instrument] #[allow(deprecated)] // linkerd/linkerd2#8733 fn grpc_status_server( - http: hyper::server::conn::Http, + server: hyper::server::conn::http2::Builder, status: tonic::Code, ) -> impl Fn(Remote) -> io::Result { move |endpoint| { @@ -635,26 +627,30 @@ fn grpc_status_server( tracing::info!("mock connecting"); let (client_io, server_io) = support::io::duplex(4096); tokio::spawn( - http.serve_connection( - server_io, - hyper::service::service_fn(move |request: Request| async move { - tracing::info!(?request); - let (mut tx, rx) = Body::channel(); - tokio::spawn(async move { - let mut trls = ::http::HeaderMap::new(); - trls.insert("grpc-status", (status as u32).to_string().parse().unwrap()); - tx.send_trailers(trls).await - }); - Ok::<_, io::Error>( - http::Response::builder() - .version(::http::Version::HTTP_2) - .header("content-type", "application/grpc") - .body(rx) - .unwrap(), - ) - }), - ) - .in_current_span(), + server + .serve_connection( + server_io, + hyper::service::service_fn(move |request: Request| async move { + tracing::info!(?request); + let (mut tx, rx) = Body::channel(); + tokio::spawn(async move { + let mut trls = ::http::HeaderMap::new(); + trls.insert( + "grpc-status", + (status as u32).to_string().parse().unwrap(), + ); + tx.send_trailers(trls).await + }); + Ok::<_, io::Error>( + http::Response::builder() + .version(::http::Version::HTTP_2) + .header("content-type", "application/grpc") + .body(rx) + .unwrap(), + ) + }), + ) + .in_current_span(), ); Ok(io::BoxedIo::new(client_io)) } From 472163e2fe4e09d1bfddab2c95bb861268224b77 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 03/10] refactor(app/test): reorder and document test helpers this moves the public `connect_client(..)` function up to the top of the group of functions, and adds a documentation comment explaining its purpose. a todo comment is left noting that this can be refactored to use tokio's new `JoinSet` type. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 81 +++++++++++++++++-------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 1519a0bdfc..867875310f 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -14,43 +14,13 @@ use hyper::client::conn::{Builder as ClientBuilder, SendRequest}; type BoxServer = svc::BoxTcp; -async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle>) { - let (client_io, server_io) = io::duplex(4096); - let f = server - .ready() - .await - .expect("proxy server failed to become ready") - .call(server_io); - - let proxy = async move { - let res = f.await.map_err(Into::into); - drop(server); - tracing::debug!("dropped server"); - tracing::info!(?res, "proxy serve task complete"); - res.map(|_| ()) - } - .instrument(tracing::info_span!("proxy")); - (client_io, tokio::spawn(proxy)) -} - -#[allow(deprecated)] // linkerd/linkerd2#8733 -async fn connect_client( - client_settings: &mut ClientBuilder, - io: io::DuplexStream, -) -> (SendRequest, JoinHandle>) { - let (client, conn) = client_settings - .handshake(io) - .await - .expect("Client must connect"); - let client_bg = conn - .map(|res| { - tracing::info!(?res, "Client background complete"); - res.map_err(Into::into) - }) - .instrument(tracing::info_span!("client_bg")); - (client, tokio::spawn(client_bg)) -} - +/// Connects a client and server, running a proxy between them. +/// +/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and +/// await a response, and (2) a [`Future`] representing background tasks. +// +// TODO(kate): this conglomeration of background tests could be replaced with a `JoinSet`, which +// has been introduced to tokio since this code was originally written. #[allow(deprecated)] // linkerd/linkerd2#8733 pub async fn connect_and_accept( client_settings: &mut ClientBuilder, @@ -73,6 +43,43 @@ pub async fn connect_and_accept( (client, bg) } +#[allow(deprecated)] // linkerd/linkerd2#8733 +async fn connect_client( + client_settings: &mut ClientBuilder, + io: io::DuplexStream, +) -> (SendRequest, JoinHandle>) { + let (client, conn) = client_settings + .handshake(io) + .await + .expect("Client must connect"); + let client_bg = conn + .map(|res| { + tracing::info!(?res, "Client background complete"); + res.map_err(Into::into) + }) + .instrument(tracing::info_span!("client_bg")); + (client, tokio::spawn(client_bg)) +} + +async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle>) { + let (client_io, server_io) = io::duplex(4096); + let f = server + .ready() + .await + .expect("proxy server failed to become ready") + .call(server_io); + + let proxy = async move { + let res = f.await.map_err(Into::into); + drop(server); + tracing::debug!("dropped server"); + tracing::info!(?res, "proxy serve task complete"); + res.map(|_| ()) + } + .instrument(tracing::info_span!("proxy")); + (client_io, tokio::spawn(proxy)) +} + /// Collects a request or response body, returning it as a [`String`]. pub async fn body_to_string(body: T) -> Result where From d74f9bed46bd3ce3b2487c2f32d4d55e9d37aa58 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 04/10] refactor(app/test): briefly defer `spawn()`ing tasks this is a small tweak, defering the call to `tokio::spawn(..)` to make using `JoinSet` easier. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 867875310f..7097547fe9 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -5,7 +5,6 @@ use crate::{ use futures::FutureExt; use hyper::{body::HttpBody, Body}; use std::future::Future; -use tokio::task::JoinHandle; use tower::{util::ServiceExt, Service}; use tracing::Instrument; @@ -30,11 +29,11 @@ pub async fn connect_and_accept( let (client_io, proxy) = run_proxy(server).await; let (client, client_bg) = connect_client(client_settings, client_io).await; let bg = async move { - proxy + tokio::spawn(proxy) .await .expect("proxy background task panicked") .map_err(ContextError::ctx("proxy background task failed"))?; - client_bg + tokio::spawn(client_bg) .await .expect("client background task panicked") .map_err(ContextError::ctx("client background task failed"))?; @@ -47,7 +46,7 @@ pub async fn connect_and_accept( async fn connect_client( client_settings: &mut ClientBuilder, io: io::DuplexStream, -) -> (SendRequest, JoinHandle>) { +) -> (SendRequest, impl Future>) { let (client, conn) = client_settings .handshake(io) .await @@ -58,10 +57,12 @@ async fn connect_client( res.map_err(Into::into) }) .instrument(tracing::info_span!("client_bg")); - (client, tokio::spawn(client_bg)) + (client, client_bg) } -async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle>) { +async fn run_proxy( + mut server: BoxServer, +) -> (io::DuplexStream, impl Future>) { let (client_io, server_io) = io::duplex(4096); let f = server .ready() @@ -77,7 +78,8 @@ async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 05/10] refactor(app/test): use `JoinSet` for background tasks this commit changes the `connect_and_accept(..)` helper function used in the proxy's unit tests, altering its logic such that it now runs background tasks inside of a `JoinSet`. Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http/tests.rs | 59 +++++++++++++++++++++------ linkerd/app/test/src/http_util.rs | 30 +++++++------- 2 files changed, 63 insertions(+), 26 deletions(-) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 7f49abedc5..e52d75bfaa 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -15,7 +15,7 @@ use linkerd_app_core::{ svc::{self, http::TracingExecutor, NewService, Param}, tls, transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr}, - NameAddr, ProxyRuntime, + Error, NameAddr, ProxyRuntime, }; use linkerd_app_test::connect::ConnectFuture; use linkerd_tracing::test::trace_init; @@ -80,7 +80,12 @@ async fn unmeshed_http1_hello_world() { let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -122,7 +127,12 @@ async fn downgrade_origin_form() { let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -163,7 +173,12 @@ async fn downgrade_absolute_form() { let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -205,7 +220,12 @@ async fn http1_bad_gateway_meshed_response_error_header() { // logical error context is added. check_error_header(rsp.headers(), "server is not listening"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -246,7 +266,12 @@ async fn http1_bad_gateway_unmeshed_response() { "response must not contain L5D_PROXY_ERROR header" ); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -291,7 +316,12 @@ async fn http1_connect_timeout_meshed_response_error_header() { // logical error context is added. check_error_header(rsp.headers(), "connect timed out after 1s"); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -334,7 +364,12 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { "response must not contain L5D_PROXY_ERROR header" ); - bg.await.expect("background task failed"); + // Wait for all of the background tasks to complete, panicking if any returned an error. + bg.join_all() + .await + .into_iter() + .collect::, Error>>() + .expect("background task failed"); } #[tokio::test(flavor = "current_thread")] @@ -376,7 +411,7 @@ async fn h2_response_meshed_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -420,7 +455,7 @@ async fn h2_response_unmeshed_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -463,7 +498,7 @@ async fn grpc_meshed_response_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] @@ -508,7 +543,7 @@ async fn grpc_unmeshed_response_error_header() { // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - let _ = bg.await; + let _ = bg.join_all().await; } #[tokio::test(flavor = "current_thread")] diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 7097547fe9..6430956956 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -5,6 +5,7 @@ use crate::{ use futures::FutureExt; use hyper::{body::HttpBody, Body}; use std::future::Future; +use tokio::task::JoinSet; use tower::{util::ServiceExt, Service}; use tracing::Instrument; @@ -16,29 +17,30 @@ type BoxServer = svc::BoxTcp; /// Connects a client and server, running a proxy between them. /// /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and -/// await a response, and (2) a [`Future`] representing background tasks. -// -// TODO(kate): this conglomeration of background tests could be replaced with a `JoinSet`, which -// has been introduced to tokio since this code was originally written. +/// await a response, and (2) a [`JoinSet`] running background tasks. #[allow(deprecated)] // linkerd/linkerd2#8733 pub async fn connect_and_accept( client_settings: &mut ClientBuilder, server: BoxServer, -) -> (SendRequest, impl Future>) { +) -> (SendRequest, JoinSet>) { tracing::info!(settings = ?client_settings, "connecting client with"); let (client_io, proxy) = run_proxy(server).await; let (client, client_bg) = connect_client(client_settings, client_io).await; - let bg = async move { - tokio::spawn(proxy) + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn(async move { + proxy .await - .expect("proxy background task panicked") - .map_err(ContextError::ctx("proxy background task failed"))?; - tokio::spawn(client_bg) + .map_err(ContextError::ctx("proxy background task failed")) + .map_err(Error::from) + }); + bg.spawn(async move { + client_bg .await - .expect("client background task panicked") - .map_err(ContextError::ctx("client background task failed"))?; - Ok(()) - }; + .map_err(ContextError::ctx("client background task failed")) + .map_err(Error::from) + }); + (client, bg) } From ca0a21a31a06e0f0ce124367ba95c04c9957995f Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 06/10] refactor(app/test): tweak proxy process this commit makes two minor changes to the `run_proxy()` helper function: * remove a frivolous `.map(|_| ())` on a result that already had a tuple `Ok` type. * use `ServiceExt::oneshot` for brevity. this should have no effect on the tests, we're just golfing things down a bit. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 6430956956..572be827ec 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -6,7 +6,7 @@ use futures::FutureExt; use hyper::{body::HttpBody, Body}; use std::future::Future; use tokio::task::JoinSet; -use tower::{util::ServiceExt, Service}; +use tower::ServiceExt; use tracing::Instrument; #[allow(deprecated)] // linkerd/linkerd2#8733 @@ -63,21 +63,13 @@ async fn connect_client( } async fn run_proxy( - mut server: BoxServer, + server: BoxServer, ) -> (io::DuplexStream, impl Future>) { let (client_io, server_io) = io::duplex(4096); - let f = server - .ready() - .await - .expect("proxy server failed to become ready") - .call(server_io); - let proxy = async move { - let res = f.await.map_err(Into::into); - drop(server); - tracing::debug!("dropped server"); + let res = server.oneshot(server_io).await; tracing::info!(?res, "proxy serve task complete"); - res.map(|_| ()) + res } .instrument(tracing::info_span!("proxy")); From d5052e95366293e4fe02b33ce8f09f2ae20db788 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 07/10] refactor(app/test): hoist `instrument(..)` calls Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 43 +++++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 572be827ec..bad8039516 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -28,18 +28,24 @@ pub async fn connect_and_accept( let (client, client_bg) = connect_client(client_settings, client_io).await; let mut bg = tokio::task::JoinSet::new(); - bg.spawn(async move { - proxy - .await - .map_err(ContextError::ctx("proxy background task failed")) - .map_err(Error::from) - }); - bg.spawn(async move { - client_bg - .await - .map_err(ContextError::ctx("client background task failed")) - .map_err(Error::from) - }); + bg.spawn( + async move { + proxy + .await + .map_err(ContextError::ctx("proxy background task failed")) + .map_err(Error::from) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + client_bg + .await + .map_err(ContextError::ctx("client background task failed")) + .map_err(Error::from) + } + .instrument(tracing::info_span!("client_bg")), + ); (client, bg) } @@ -53,12 +59,10 @@ async fn connect_client( .handshake(io) .await .expect("Client must connect"); - let client_bg = conn - .map(|res| { - tracing::info!(?res, "Client background complete"); - res.map_err(Into::into) - }) - .instrument(tracing::info_span!("client_bg")); + let client_bg = conn.map(|res| { + tracing::info!(?res, "Client background complete"); + res.map_err(Into::into) + }); (client, client_bg) } @@ -70,8 +74,7 @@ async fn run_proxy( let res = server.oneshot(server_io).await; tracing::info!(?res, "proxy serve task complete"); res - } - .instrument(tracing::info_span!("proxy")); + }; (client_io, proxy) } From a7951e10ac2ff9d73134853d031cc30a0d2bd66f Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 08/10] refactor(app/test): remove `run_proxy(..)` we've nudged this along well enough that this function can now reasonably be removed. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index bad8039516..2880302f18 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -24,7 +24,12 @@ pub async fn connect_and_accept( server: BoxServer, ) -> (SendRequest, JoinSet>) { tracing::info!(settings = ?client_settings, "connecting client with"); - let (client_io, proxy) = run_proxy(server).await; + let (client_io, server_io) = io::duplex(4096); + let proxy = async move { + let res = server.oneshot(server_io).await; + tracing::info!(?res, "proxy serve task complete"); + res + }; let (client, client_bg) = connect_client(client_settings, client_io).await; let mut bg = tokio::task::JoinSet::new(); @@ -66,19 +71,6 @@ async fn connect_client( (client, client_bg) } -async fn run_proxy( - server: BoxServer, -) -> (io::DuplexStream, impl Future>) { - let (client_io, server_io) = io::duplex(4096); - let proxy = async move { - let res = server.oneshot(server_io).await; - tracing::info!(?res, "proxy serve task complete"); - res - }; - - (client_io, proxy) -} - /// Collects a request or response body, returning it as a [`String`]. pub async fn body_to_string(body: T) -> Result where From 15d1b09febcca1b9ba6f31074e68cf8da34694a3 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 09/10] refactor(app/test): remove `connect_client(..)` and once more, we remove a helper function that isn't doing quite so much work, and whose signature contains deprecated hyper 1.0 types. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 2880302f18..cca25f412c 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -4,7 +4,6 @@ use crate::{ }; use futures::FutureExt; use hyper::{body::HttpBody, Body}; -use std::future::Future; use tokio::task::JoinSet; use tower::ServiceExt; use tracing::Instrument; @@ -30,7 +29,15 @@ pub async fn connect_and_accept( tracing::info!(?res, "proxy serve task complete"); res }; - let (client, client_bg) = connect_client(client_settings, client_io).await; + + let (client, conn) = client_settings + .handshake(client_io) + .await + .expect("Client must connect"); + let client_bg = conn.map(|res| { + tracing::info!(?res, "Client background complete"); + res.map_err(Error::from) + }); let mut bg = tokio::task::JoinSet::new(); bg.spawn( @@ -55,22 +62,6 @@ pub async fn connect_and_accept( (client, bg) } -#[allow(deprecated)] // linkerd/linkerd2#8733 -async fn connect_client( - client_settings: &mut ClientBuilder, - io: io::DuplexStream, -) -> (SendRequest, impl Future>) { - let (client, conn) = client_settings - .handshake(io) - .await - .expect("Client must connect"); - let client_bg = conn.map(|res| { - tracing::info!(?res, "Client background complete"); - res.map_err(Into::into) - }); - (client, client_bg) -} - /// Collects a request or response body, returning it as a [`String`]. pub async fn body_to_string(body: T) -> Result where From ae3cca22f0219533bd3b8f59b81fb0f1e852ddcf Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 10/10] refactor(app/test): consolidate anonymous futures we create `async move {}` blocks in multiple places, without any clear benefit. this commit consolidates them into one place: when we spawn a task onto the `JoinSet`. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index cca25f412c..91bb4f53d5 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -2,7 +2,6 @@ use crate::{ app_core::{svc, Error}, io, ContextError, }; -use futures::FutureExt; use hyper::{body::HttpBody, Body}; use tokio::task::JoinSet; use tower::ServiceExt; @@ -24,37 +23,31 @@ pub async fn connect_and_accept( ) -> (SendRequest, JoinSet>) { tracing::info!(settings = ?client_settings, "connecting client with"); let (client_io, server_io) = io::duplex(4096); - let proxy = async move { - let res = server.oneshot(server_io).await; - tracing::info!(?res, "proxy serve task complete"); - res - }; let (client, conn) = client_settings .handshake(client_io) .await .expect("Client must connect"); - let client_bg = conn.map(|res| { - tracing::info!(?res, "Client background complete"); - res.map_err(Error::from) - }); let mut bg = tokio::task::JoinSet::new(); bg.spawn( async move { - proxy + server + .oneshot(server_io) .await - .map_err(ContextError::ctx("proxy background task failed")) - .map_err(Error::from) + .map_err(ContextError::ctx("proxy background task failed"))?; + tracing::info!("proxy serve task complete"); + Ok(()) } .instrument(tracing::info_span!("proxy")), ); bg.spawn( async move { - client_bg - .await + conn.await .map_err(ContextError::ctx("client background task failed")) - .map_err(Error::from) + .map_err(Error::from)?; + tracing::info!("client background complete"); + Ok(()) } .instrument(tracing::info_span!("client_bg")), );