diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index e57f80991d..e1d7c94725 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -770,7 +770,7 @@ mod tests { fn empty_body_is_always_eos() { // If the initial body was empty, every clone should always return // `true` from `is_end_stream`. - let initial = ReplayBody::try_new(hyper::Body::empty(), 64 * 1024) + let initial = ReplayBody::try_new(BoxBody::empty(), 64 * 1024) .expect("empty body can't be too large"); assert!(initial.is_end_stream()); @@ -785,7 +785,7 @@ mod tests { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let mut initial = ReplayBody::try_new(hyper::Body::from("hello world"), 64 * 1024) + let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) .expect("body must not be too large"); let mut replay = initial.clone(); @@ -829,6 +829,9 @@ mod tests { // initial body will complete, but the replay will immediately fail. let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. let (mut tx, body) = hyper::Body::channel(); let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); let mut replay = initial.clone(); @@ -859,6 +862,9 @@ mod tests { // cap, we allow the request to continue, but stop buffering. let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"); + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. let (mut tx, body) = hyper::Body::channel(); let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large"); let mut replay = initial.clone(); @@ -889,11 +895,13 @@ mod tests { #[test] fn body_too_big() { let max_size = 8; - let mk_body = - |sz: usize| -> hyper::Body { (0..sz).map(|_| "x").collect::().into() }; + let mk_body = |sz: usize| -> BoxBody { + let s = (0..sz).map(|_| "x").collect::(); + BoxBody::new(s) + }; assert!( - ReplayBody::try_new(hyper::Body::empty(), max_size).is_ok(), + ReplayBody::try_new(BoxBody::empty(), max_size).is_ok(), "empty body is not too big" ); @@ -907,6 +915,9 @@ mod tests { "over-sized body is too big" ); + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. let (_sender, body) = hyper::Body::channel(); assert!( ReplayBody::try_new(body, max_size).is_ok(), @@ -915,9 +926,13 @@ mod tests { } struct Test { + // Sends body data. tx: Tx, - initial: ReplayBody, - replay: ReplayBody, + /// The "initial" body. + initial: ReplayBody, + /// Replays the initial body. + replay: ReplayBody, + /// An RAII guard for the tracing subscriber. _trace: tracing::subscriber::DefaultGuard, } @@ -925,8 +940,11 @@ mod tests { impl Test { fn new() -> Self { - let (tx, body) = hyper::Body::channel(); - let initial = ReplayBody::try_new(body, 64 * 1024).expect("body too large"); + // TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via + // `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`. + // alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`. + let (tx, rx) = hyper::Body::channel(); + let initial = ReplayBody::try_new(BoxBody::new(rx), 64 * 1024).expect("body too large"); let replay = initial.clone(); Self { tx: Tx(tx),