From 1df826235c1f9b6cea5c6829fd75100e37525604 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 6 Mar 2024 23:19:13 +0000 Subject: [PATCH] chore: Use a fork of hyper with Body::poll_progress hyperium/http-body#90 proposes adding a `Body::poll_progress` method to the `Body` trait. This PR uses a fork of hyper that uses this proposed API when awaiting stream send capacity. This supports implementing timeouts on streams and connections in an unhealthy state to defend servers against resource exhaustion. --- .gitmodules | 6 ++++++ Cargo.lock | 4 ---- Cargo.toml | 9 +++++++++ http-body | 1 + hyper | 1 + hyper-balance/src/lib.rs | 5 +++++ linkerd/app/core/src/errors/respond.rs | 7 +++++++ linkerd/http/box/src/body.rs | 15 ++++++++++++++- linkerd/http/metrics/src/requests/service.rs | 11 +++++++++++ linkerd/http/retry/src/replay.rs | 15 +++++++++++++++ linkerd/http/retry/src/with_trailers.rs | 5 +++++ linkerd/proxy/http/src/classify/channel.rs | 5 +++++ linkerd/proxy/http/src/glue.rs | 1 + linkerd/proxy/http/src/orig_proto.rs | 9 +++++++++ linkerd/proxy/http/src/retain.rs | 5 +++++ linkerd/proxy/tap/src/service.rs | 5 +++++ 16 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 .gitmodules create mode 160000 http-body create mode 160000 hyper diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..dbfceb6b1f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "http-body"] + path = http-body + url = git@github.com:olix0r/http-body.git +[submodule "hyper"] + path = hyper + url = git@github.com:olix0r/hyper.git diff --git a/Cargo.lock b/Cargo.lock index 5028f764c8..d28aa85573 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -830,8 +830,6 @@ dependencies = [ [[package]] name = "http-body" version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", "http", @@ -859,8 +857,6 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" version = "0.14.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", diff --git a/Cargo.toml b/Cargo.toml index cf64691988..e6622eefeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,15 @@ members = [ "tools", ] +exclude = [ + "http-body", + "hyper", +] + [profile.release] debug = 1 lto = true + +[patch.crates-io] +http-body = { path = "http-body" } +hyper = { path = "hyper" } diff --git a/http-body b/http-body new file mode 160000 index 0000000000..30db493e9d --- /dev/null +++ b/http-body @@ -0,0 +1 @@ +Subproject commit 30db493e9d25bb576a978ddef7c0c728bdc01071 diff --git a/hyper b/hyper new file mode 160000 index 0000000000..542b48da09 --- /dev/null +++ b/hyper @@ -0,0 +1 @@ +Subproject commit 542b48da0906139744be3c8434927e9178839f46 diff --git a/hyper-balance/src/lib.rs b/hyper-balance/src/lib.rs index 4531212e32..01e5b67826 100644 --- a/hyper-balance/src/lib.rs +++ b/hyper-balance/src/lib.rs @@ -116,6 +116,11 @@ where Poll::Ready(ret) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().body.poll_progress(cx) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/app/core/src/errors/respond.rs b/linkerd/app/core/src/errors/respond.rs index 0b57fb36a6..a4abc30288 100644 --- a/linkerd/app/core/src/errors/respond.rs +++ b/linkerd/app/core/src/errors/respond.rs @@ -469,6 +469,13 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ResponseBodyProj::Passthru(inner) => inner.poll_progress(cx), + ResponseBodyProj::GrpcRescue { inner, .. } => inner.poll_progress(cx), + } + } + #[inline] fn poll_trailers( self: Pin<&mut Self>, diff --git a/linkerd/http/box/src/body.rs b/linkerd/http/box/src/body.rs index 16f2ff4181..e149b5158d 100644 --- a/linkerd/http/box/src/body.rs +++ b/linkerd/http/box/src/body.rs @@ -58,6 +58,14 @@ impl Body for BoxBody { self.as_mut().inner.as_mut().poll_data(cx) } + #[inline] + fn poll_progress( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.as_mut().inner.as_mut().poll_progress(cx) + } + #[inline] fn poll_trailers( mut self: Pin<&mut Self>, @@ -116,12 +124,17 @@ where })) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_progress(cx).map_err(Into::into) + } + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>, Self::Error>> { - Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into)) + self.project().0.poll_trailers(cx).map_err(Into::into) } #[inline] diff --git a/linkerd/http/metrics/src/requests/service.rs b/linkerd/http/metrics/src/requests/service.rs index ad1af7ecbc..9935c5fb88 100644 --- a/linkerd/http/metrics/src/requests/service.rs +++ b/linkerd/http/metrics/src/requests/service.rs @@ -283,6 +283,12 @@ where Poll::Ready(frame) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -422,6 +428,11 @@ where Poll::Ready(frame) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx).map_err(Into::into) + } + fn poll_trailers( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 662ec68429..96256c1b2e 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -251,6 +251,21 @@ where Poll::Ready(Some(Ok(Data::Initial(chunk)))) } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let state = Self::acquire_state(&mut this.state, &this.shared.body); + tracing::trace!("ReplayBody::poll_progress"); + + if let Some(rest) = state.rest.as_mut() { + // If the inner body has previously ended, don't poll it again. + if !rest.is_end_stream() { + return Pin::new(rest).poll_progress(cx).map_err(Into::into); + } + } + + Poll::Ready(Ok(())) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/http/retry/src/with_trailers.rs b/linkerd/http/retry/src/with_trailers.rs index eb92c855f1..39b62d6f54 100644 --- a/linkerd/http/retry/src/with_trailers.rs +++ b/linkerd/http/retry/src/with_trailers.rs @@ -140,6 +140,11 @@ where Pin::new(&mut this.inner).poll_data(cx) } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.inner).poll_progress(cx) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/proxy/http/src/classify/channel.rs b/linkerd/proxy/http/src/classify/channel.rs index 413887345b..3bb6bf87ae 100644 --- a/linkerd/proxy/http/src/classify/channel.rs +++ b/linkerd/proxy/http/src/classify/channel.rs @@ -232,6 +232,11 @@ where } } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/proxy/http/src/glue.rs b/linkerd/proxy/http/src/glue.rs index 16c4dde464..352e0421fe 100644 --- a/linkerd/proxy/http/src/glue.rs +++ b/linkerd/proxy/http/src/glue.rs @@ -49,6 +49,7 @@ pub struct HyperConnectFuture { } // === impl UpgradeBody === +// Note: There's no poll_progress implementation in hyper::Body. impl HttpBody for UpgradeBody { type Data = Bytes; type Error = hyper::Error; diff --git a/linkerd/proxy/http/src/orig_proto.rs b/linkerd/proxy/http/src/orig_proto.rs index 858b7a8431..cc45444b00 100644 --- a/linkerd/proxy/http/src/orig_proto.rs +++ b/linkerd/proxy/http/src/orig_proto.rs @@ -203,6 +203,7 @@ impl HttpBody for UpgradeResponseBody { self.inner.is_end_stream() } + #[inline] fn poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -212,6 +213,14 @@ impl HttpBody for UpgradeResponseBody { .map_err(downgrade_h2_error) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.project().inner) + .poll_progress(cx) + .map_err(downgrade_h2_error) + } + + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/proxy/http/src/retain.rs b/linkerd/proxy/http/src/retain.rs index e5ed0f9deb..78be057058 100644 --- a/linkerd/proxy/http/src/retain.rs +++ b/linkerd/proxy/http/src/retain.rs @@ -113,6 +113,11 @@ impl http_body::Body for RetainBody { self.project().inner.poll_data(cx) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + #[inline] fn poll_trailers( self: Pin<&mut Self>, diff --git a/linkerd/proxy/tap/src/service.rs b/linkerd/proxy/tap/src/service.rs index fc70d5eee1..0013ce1cee 100644 --- a/linkerd/proxy/tap/src/service.rs +++ b/linkerd/proxy/tap/src/service.rs @@ -190,6 +190,11 @@ where } } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + fn poll_trailers( mut self: Pin<&mut Self>, cx: &mut Context<'_>,