Skip to content

Commit

Permalink
Update hyper to 0.13.1
Browse files Browse the repository at this point in the history
  • Loading branch information
schrieveslaach committed Jan 3, 2020
1 parent 96fe64a commit 9f4c6c0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 38 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ base64 = "0.11.0"
bytes = "0.4.12"
chrono = { version = "0.4.9", optional = true, features = ["serde"] }
flate2 = "1.0.13"
http = "0.1.19"
hyper = "0.13.0-alpha.4"
http = "0.2.0"
hyper = "0.13.1"
hyper-openssl = { version = "0.8.0-alpha.4", optional = true }
hyperlocal = { git="https://github.com/danieleades/hyperlocal", branch="update-hyper", optional = true }
log = "0.4.8"
Expand All @@ -36,13 +36,13 @@ url = "2.1.0"
futures_codec = "0.3.1"
futures-util = "0.3.1"
pin-project = "0.4.6"
tokio-io = "0.2.0-alpha.6"
tokio = "0.2.6"


[dev-dependencies]
env_logger = "0.7.1"
futures = "0.3.1"
tokio = "0.2.0-alpha.6"
tokio = { version = "0.2.6", features = ["macros"] }

[features]
default = ["chrono", "unix-socket", "tls"]
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,12 @@ impl Docker {
pub fn host(host: Uri) -> Docker {
let tcp_host_str = format!(
"{}://{}:{}",
host.scheme_part().map(|s| s.as_str()).unwrap(),
host.scheme_str().unwrap(),
host.host().unwrap().to_owned(),
host.port_u16().unwrap_or(80)
);

match host.scheme_part().map(|s| s.as_str()) {
match host.scheme_str() {
#[cfg(feature = "unix-socket")]
Some("unix") => Docker {
transport: Transport::Unix {
Expand Down Expand Up @@ -1136,7 +1136,7 @@ impl Docker {
endpoint: impl AsRef<str> + 'a,
body: Option<(Body, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<hyper::Chunk>> + 'a
) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
where
H: IntoIterator<Item = (&'static str, String)> + 'a,
{
Expand All @@ -1147,7 +1147,7 @@ impl Docker {
fn stream_get<'a>(
&'a self,
endpoint: impl AsRef<str> + Unpin + 'a,
) -> impl Stream<Item = Result<hyper::Chunk>> + 'a {
) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
let headers = Some(Vec::default());
self.transport
.stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
Expand Down
55 changes: 26 additions & 29 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use crate::{Error, Result};
use futures_util::{
io::{AsyncRead, AsyncWrite},
stream::Stream,
TryFutureExt, TryStreamExt,
TryFutureExt, StreamExt,
};
use hyper::{
body::Bytes,
client::{Client, HttpConnector},
header, Body, Chunk, Method, Request, StatusCode,
header, Body, Method, Request, StatusCode,
};
#[cfg(feature = "tls")]
use hyper_openssl::HttpsConnector;
Expand Down Expand Up @@ -79,12 +80,11 @@ impl Transport {
where
B: Into<Body>,
{
let chunk = self
.stream_chunks(method, endpoint, body, None::<iter::Empty<_>>)
.try_concat()
let body = self
.get_body(method, endpoint, body, None::<iter::Empty<_>>)
.await?;

let string = String::from_utf8(chunk.to_vec())?;
let bytes = hyper::body::to_bytes(body).await?;
let string = String::from_utf8(bytes.to_vec())?;

Ok(string)
}
Expand All @@ -101,7 +101,7 @@ impl Transport {
H: IntoIterator<Item = (&'static str, String)>,
{
let req = self
.build_request(method, endpoint, body, headers, |_| ())
.build_request(method, endpoint, body, headers, |builder| builder)
.expect("Failed to build request!");

let response = self.send_request(req).await?;
Expand All @@ -115,9 +115,8 @@ impl Transport {
| StatusCode::SWITCHING_PROTOCOLS
| StatusCode::NO_CONTENT => Ok(response.into_body()),
_ => {
let chunk = concat_chunks(response.into_body()).await?;

let message_body = String::from_utf8(chunk.into_bytes().into_iter().collect())?;
let bytes = hyper::body::to_bytes(response.into_body()).await?;
let message_body = String::from_utf8(bytes.to_vec())?;

Err(Error::Fault {
code: status,
Expand All @@ -138,7 +137,7 @@ impl Transport {
endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
headers: Option<H>,
) -> Result<impl Stream<Item = Result<Chunk>>>
) -> Result<impl Stream<Item = Result<Bytes>>>
where
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)>,
Expand All @@ -154,7 +153,7 @@ impl Transport {
endpoint: impl AsRef<str> + 'a,
body: Option<(B, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<Chunk>> + 'a
) -> impl Stream<Item = Result<Bytes>> + 'a
where
H: IntoIterator<Item = (&'static str, String)> + 'a,
B: Into<Body> + 'a,
Expand All @@ -164,20 +163,21 @@ impl Transport {
}

/// Builds an HTTP request.
fn build_request<B, H>(
fn build_request<B, H, F>(
&self,
method: Method,
endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
headers: Option<H>,
f: impl FnOnce(&mut ::http::request::Builder),
f: F,
) -> Result<Request<Body>>
where
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)>,
F: Fn(::http::request::Builder) -> ::http::request::Builder
{
let mut builder = Request::builder();
f(&mut builder);
builder = f(builder);

let req = match *self {
Transport::Tcp { ref host, .. } => {
Expand All @@ -193,15 +193,15 @@ impl Transport {
}
#[cfg(feature = "unix-socket")]
Transport::Unix { ref path, .. } => {
let uri: hyper::Uri = DomainUri::new(&path, endpoint.as_ref())?.into();
builder.method(method).uri(&uri.to_string())
let uri = DomainUri::new(&path, endpoint.as_ref());
builder.method(method).uri(uri)
}
};
let req = req.header(header::HOST, "");
let mut req = req.header(header::HOST, "");

if let Some(h) = headers {
for (k, v) in h.into_iter() {
req.header(k, v);
req = req.header(k, v);
}
}

Expand Down Expand Up @@ -252,8 +252,8 @@ impl Transport {
let req = self
.build_request(method, endpoint, body, None::<iter::Empty<_>>, |builder| {
builder
.header(header::CONNECTION, "Upgrade")
.header(header::UPGRADE, "tcp");
.header(header::CONNECTION.as_str(), "Upgrade")
.header(header::UPGRADE.as_str(), "tcp")
})
.expect("Failed to build request!");

Expand Down Expand Up @@ -296,7 +296,7 @@ struct Compat<S> {

impl<S> AsyncRead for Compat<S>
where
S: tokio_io::AsyncRead,
S: tokio::io::AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
Expand All @@ -309,7 +309,7 @@ where

impl<S> AsyncWrite for Compat<S>
where
S: tokio_io::AsyncWrite,
S: tokio::io::AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -337,8 +337,8 @@ struct ErrorResponse {
message: String,
}

fn stream_body(body: Body) -> impl Stream<Item = Result<Chunk>> {
async fn unfold(mut body: Body) -> Option<(Result<Chunk>, Body)> {
fn stream_body(body: Body) -> impl Stream<Item = Result<Bytes>> {
async fn unfold(mut body: Body) -> Option<(Result<Bytes>, Body)> {
let chunk_result = body.next().await?.map_err(Error::from);

Some((chunk_result, body))
Expand All @@ -347,6 +347,3 @@ fn stream_body(body: Body) -> impl Stream<Item = Result<Chunk>> {
futures_util::stream::unfold(body, unfold)
}

async fn concat_chunks(body: Body) -> Result<Chunk> {
stream_body(body).try_concat().await
}
2 changes: 1 addition & 1 deletion src/tty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where

pub(crate) fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
where
S: Stream<Item = Result<hyper::Chunk>> + Unpin,
S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
{
let stream = hyper_chunk_stream
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
Expand Down

0 comments on commit 9f4c6c0

Please sign in to comment.