Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ struct SharedState<B> {
struct BodyState<B> {
buf: BufList,
trailers: Option<HeaderMap>,
rest: Option<B>,
rest: B,
is_completed: bool,

/// Maximum number of bytes to buffer.
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<B: Body> ReplayBody<B> {
state: Some(BodyState {
buf: Default::default(),
trailers: None,
rest: Some(body),
rest: body,
is_completed: false,
max_bytes: max_bytes + 1,
}),
Expand Down Expand Up @@ -216,15 +216,8 @@ where
// that so that future clones will not try polling it again (as
// described above).
let mut data = {
// Get access to the initial body. If we don't have access to the
// inner body, there's no more work to do.
let rest = match state.rest.as_mut() {
Some(rest) => rest,
None => return Poll::Ready(None),
};

tracing::trace!("Polling initial body");
match futures::ready!(Pin::new(rest).poll_data(cx)) {
match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) {
Some(Ok(data)) => data,
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
None => {
Expand Down Expand Up @@ -285,41 +278,37 @@ where
}
}

if let Some(rest) = state.rest.as_mut() {
// If the inner body has previously ended, don't poll it again.
if !rest.is_end_stream() {
let res = futures::ready!(Pin::new(rest).poll_trailers(cx)).map(|tlrs| {
if state.trailers.is_none() {
state.trailers.clone_from(&tlrs);
}
tlrs
});
return Poll::Ready(res.map_err(Into::into));
}
// If the inner body has previously ended, don't poll it again.
if !state.rest.is_end_stream() {
let res = futures::ready!(Pin::new(&mut state.rest).poll_trailers(cx)).map(|tlrs| {
if state.trailers.is_none() {
state.trailers.clone_from(&tlrs);
}
tlrs
});
return Poll::Ready(res.map_err(Into::into));
}

Poll::Ready(Ok(None))
}

fn is_end_stream(&self) -> bool {
// if the initial body was EOS as soon as it was wrapped, then we are
// empty.
// If the initial body was empty as soon as it was wrapped, then we are finished.
if self.shared.was_empty {
return true;
}

let is_inner_eos = self
.state
.as_ref()
.and_then(|state| state.rest.as_ref().map(Body::is_end_stream))
.unwrap_or(false);
let Some(state) = self.state.as_ref() else {
// This body is not currently the "active" replay being polled.
return false;
};

// if this body has data or trailers remaining to play back, it
// is not EOS
!self.replay_body && !self.replay_trailers
// if we have replayed everything, the initial body may
// still have data remaining, so ask it
&& is_inner_eos
&& state.rest.is_end_stream()
}

#[inline]
Expand All @@ -333,10 +322,7 @@ where
// Otherwise, if we're holding the state but have dropped the inner
// body, the entire body is buffered so we know the exact size hint.
let buffered = state.buf.remaining() as u64;
let rest_hint = match state.rest.as_ref() {
Some(rest) => rest.size_hint(),
None => return SizeHint::with_exact(buffered),
};
let rest_hint = state.rest.size_hint();

// Otherwise, add the inner body's size hint to the amount of buffered
// data. An upper limit is only set if the inner body has an upper
Expand Down
Loading