Skip to content

Commit

Permalink
Update http1 an h2 dispatchers to use the new body progress API.
Browse files Browse the repository at this point in the history
This is a WIP, and the tests are not yet updated.
  • Loading branch information
olix0r committed Apr 17, 2024
1 parent 961d221 commit 8c1cd34
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
22 changes: 18 additions & 4 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,24 @@ where
}

fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.conn.poll_flush(cx).map_err(|err| {
debug!("error writing: {}", err);
crate::Error::new_body_write(err)
})
if let Poll::Ready(res) = self.conn.poll_flush(cx) {
return Poll::Ready(res.map_err(|err| {
debug!("error writing: {}", err);
crate::Error::new_body_write(err)
}));
}

// Drop the body_rx if it's done.
if let (Some(body), clear_body) =
OptGuard::new(self.body_rx.as_mut()).guard_mut()
{
if let Poll::Ready(Err(e)) = body.poll_progress(cx) {
*clear_body = true;
return Poll::Ready(Err(crate::Error::new_user_body(e)));
}
}

Poll::Pending
}

fn close(&mut self) {
Expand Down
20 changes: 15 additions & 5 deletions src/proto/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,30 @@ where

if me.body_tx.capacity() == 0 {
loop {
match ready!(me.body_tx.poll_capacity(cx)) {
Some(Ok(0)) => {}
Some(Ok(_)) => break,
Some(Err(e)) => {
match me.body_tx.poll_capacity(cx) {
Poll::Ready(Some(Ok(0))) => continue,
Poll::Ready(Some(Ok(_))) => break,
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Err(crate::Error::new_body_write(e)))
}
None => {
Poll::Ready(None) => {
// None means the stream is no longer in a
// streaming state, we either finished it
// somehow, or the remote reset us.
return Poll::Ready(Err(crate::Error::new_body_write(
"send stream capacity unexpectedly closed",
)));
}
Poll::Pending => {
// If we're waiting for capacity, poll the body stream
// to see if it's been canceled.
return match me.stream.as_mut().poll_progress(cx) {
Poll::Ready(Err(e)) => {
Poll::Ready(Err(me.body_tx.on_user_err(e)))
}
_ => Poll::Pending,
};
}
}
}
} else if let Poll::Ready(reason) = me
Expand Down

0 comments on commit 8c1cd34

Please sign in to comment.