Skip to content

Commit

Permalink
chart a course through lifetime hell and mix async-await notation wit…
Browse files Browse the repository at this point in the history
…h functions returning streams
  • Loading branch information
danieleades committed Sep 13, 2019
1 parent 5962594 commit 200f598
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 73 deletions.
40 changes: 21 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ use crate::{
transport::{tar, Transport},
tty::TtyDecoder,
};
use futures::Stream;
use futures::stream::{StreamExt, TryStreamExt};
use futures::future::{FutureExt, TryFutureExt};
use futures::{
future::{FutureExt, TryFutureExt},
stream::{StreamExt, TryStreamExt},
Stream,
};
use hyper::{client::HttpConnector, Body, Client, Method, Uri};
#[cfg(feature = "tls")]
use hyper_openssl::HttpsConnector;
Expand Down Expand Up @@ -115,7 +117,7 @@ impl<'a, 'b> Image<'a, 'b> {
pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> {
self.docker
.stream_get(&format!("/images/{}/get", self.name)[..])
.map(|c| c.to_vec())
.map_ok(|c| c.to_vec())
}

/// Adds a tag to an image
Expand Down Expand Up @@ -165,18 +167,18 @@ impl<'a> Images<'a> {
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
)
.map(|r| {
.map_ok(|r| {
futures::stream::iter(
serde_json::Deserializer::from_slice(&r[..])
.into_iter::<Value>()
.collect::<Vec<_>>(),
)
.map_err(Error::from)
})
.flatten(),
) as Box<dyn Stream<Item = Value, Error = Error> + Send>,
.try_flatten(),
) as Box<dyn Stream<Item = Result<Value>> + Send>,
Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
as Box<dyn Stream<Item = Value, Error = Error> + Send>,
as Box<dyn Stream<Item = Result<Value>> + Send>,
}
}

Expand Down Expand Up @@ -259,7 +261,7 @@ impl<'a> Images<'a> {
pub fn import(
self,
mut tarball: Box<dyn Read>,
) -> impl Stream<Item = Result<Value>>{
) -> impl Stream<Item = Result<Value>> {
let mut bytes = Vec::new();

match tarball.read_to_end(&mut bytes) {
Expand Down Expand Up @@ -371,7 +373,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> {
self.docker
.stream_get(&format!("/containers/{}/export", self.id)[..])
.map(|c| c.to_vec())
.map_ok(|c| c.to_vec())
}

/// Returns a stream of stats specific to this container instance
Expand Down Expand Up @@ -546,7 +548,7 @@ impl<'a, 'b> Container<'a, 'b> {
));
FramedRead::new(chunk_stream, decoder)
})
.flatten_stream()
.try_flatten_stream()
}

/// Copy a file/folder from the container. The resulting stream is a tarball of the extracted
Expand Down Expand Up @@ -1119,12 +1121,12 @@ impl Docker {
.and_then(|v| serde_json::from_str::<T>(&v).map_err(Error::SerdeJsonError))
}

fn stream_post<'a, B, H>(
&'a self,
endpoint: &'a str,
fn stream_post<B, H>(
&self,
endpoint: &str,
body: Option<(B, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<hyper::Chunk>> + 'a
) -> impl Stream<Item = Result<hyper::Chunk>>
where
B: Into<Body> + 'static,
H: IntoIterator<Item = (&'static str, String)> + 'static,
Expand All @@ -1133,10 +1135,10 @@ impl Docker {
.stream_chunks(Method::POST, endpoint, body, headers)
}

fn stream_get<'a>(
&'a self,
endpoint: &'a str,
) -> impl Stream<Item = Result<hyper::Chunk>> + 'a {
fn stream_get(
&self,
endpoint: &str,
) -> impl Stream<Item = Result<hyper::Chunk>> {
self.transport
.stream_chunks::<Body, iter::Empty<_>>(Method::GET, endpoint, None, None)
}
Expand Down
18 changes: 9 additions & 9 deletions src/read.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::errors::Error;
use futures::{Async, Stream};
use crate::Result;
use futures::Stream;
use hyper::Chunk;
use std::{
cmp,
io::{self, Read},
pin::Pin,
task::{Context, Poll},
};
use tokio_io::AsyncRead;

Expand Down Expand Up @@ -34,7 +36,7 @@ pub struct StreamReader<S> {

impl<S> StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
S: Stream<Item = Result<Chunk>>,
{
#[inline]
pub fn new(stream: S) -> StreamReader<S> {
Expand All @@ -47,7 +49,7 @@ where

impl<S> Read for StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
S: Stream<Item = Result<Chunk>>,
{
fn read(
&mut self,
Expand Down Expand Up @@ -79,17 +81,17 @@ where
match self.stream.poll() {
// Polling stream yielded a Chunk that can be read from.
//
Ok(Async::Ready(Some(chunk))) => {
Ok(Poll::Ready(Some(chunk))) => {
self.state = ReadState::Ready(chunk, 0);

continue;
}
// Polling stream yielded EOF.
//
Ok(Async::Ready(None)) => return Ok(0),
Ok(Poll::Ready(None)) => return Ok(0),
// Stream could not be read from.
//
Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
Ok(Poll::Pending) => return Err(io::ErrorKind::WouldBlock.into()),
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}
Expand All @@ -101,5 +103,3 @@ where
}
}
}

impl<S> AsyncRead for StreamReader<S> where S: Stream<Item = Chunk, Error = Error> {}
89 changes: 48 additions & 41 deletions src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Transports for communicating with the docker daemon
use crate::{Error, Result};
use futures::stream::{Stream, StreamExt, TryStreamExt};
use futures::future::{FutureExt, TryFutureExt};
use futures::future::Either;
use futures::{
future::{FutureExt, TryFutureExt},
stream::{Stream, StreamExt, TryStreamExt},
};
use hyper::{
client::{Client, HttpConnector},
header, Body, Chunk, Method, Request, StatusCode,
Expand All @@ -17,7 +18,7 @@ use hyperlocal::Uri as DomainUri;
use mime::Mime;
use serde::{Deserialize, Serialize};
use serde_json;
use std::{fmt, iter};
use std::{fmt, future::Future, iter};
use tokio_io::{AsyncRead, AsyncWrite};

pub fn tar() -> Mime {
Expand Down Expand Up @@ -78,7 +79,8 @@ impl Transport {
let vec = Vec::new();

while let Some(piece) = self
.get_chunks(method, &endpoint, body, None::<iter::Empty<_>>).await?
.get_chunks(method, &endpoint, body, None::<iter::Empty<_>>)
.await?
.next()
.await
{
Expand All @@ -92,27 +94,28 @@ impl Transport {
}

/// Make a request and return a `Stream` of `Chunks` as they are returned.
pub(crate) fn stream_chunks<'a, B,H>(
&'a self,
pub(crate) fn stream_chunks<B, H>(
&self,
method: Method,
endpoint: &'a str,
endpoint: &str,
body: Option<(B, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<Chunk>> + 'a
) -> impl Stream<Item = Result<Chunk>>
where
B: Into<Body> + 'static,
H: IntoIterator<Item = (&'static str, String)> + 'static,
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)>,
{
self.get_chunks(method, endpoint, body, headers).try_flatten_stream()
self.get_chunks(method, endpoint, body, headers)
.try_flatten_stream()
}

async fn get_chunks<B, H>(
fn get_chunks<B, H>(
&self,
method: Method,
endpoint: &str,
body: Option<(B, Mime)>,
headers: Option<H>,
) -> Result<impl Stream<Item = Result<Chunk>>>
) -> impl Future<Output = Result<impl Stream<Item = Result<Chunk>>>>
where
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)>,
Expand All @@ -121,34 +124,38 @@ impl Transport {
.build_request(method, endpoint, body, headers, |_| ())
.expect("Failed to build request!");

let response = self.send_request(req).await?;
let response_future = self.send_request(req);

async {
let response = response_future.await?;

let status = response.status();
let status = response.status();

match status {
StatusCode::OK
| StatusCode::CREATED
| StatusCode::SWITCHING_PROTOCOLS
| StatusCode::NO_CONTENT => return Ok(response.into_body().map_err(Error::Hyper)),
_ => {
let vec = Vec::new();
while let Some(piece) = response.into_body().next().await {
match piece {
Ok(x) => vec.extend(x),
Err(e) => return Err(Error::Hyper(e)),
match status {
StatusCode::OK
| StatusCode::CREATED
| StatusCode::SWITCHING_PROTOCOLS
| StatusCode::NO_CONTENT => return Ok(response.into_body().map_err(Error::Hyper)),
_ => {
let vec = Vec::new();
while let Some(piece) = response.into_body().next().await {
match piece {
Ok(x) => vec.extend(x),
Err(e) => return Err(Error::Hyper(e)),
}
}
}
let body = String::from_utf8(vec).map_err(Error::Encoding)?;
let body = String::from_utf8(vec).map_err(Error::Encoding)?;

Err(Error::Fault {
code: status,
message: Self::get_error_message(&body).unwrap_or_else(|| {
status
.canonical_reason()
.unwrap_or_else(|| "unknown error code")
.to_owned()
}),
})
Err(Error::Fault {
code: status,
message: Self::get_error_message(&body).unwrap_or_else(|| {
status
.canonical_reason()
.unwrap_or_else(|| "unknown error code")
.to_owned()
}),
})
}
}
}
}
Expand Down Expand Up @@ -200,10 +207,10 @@ impl Transport {
}

/// Send the given request to the docker daemon and return a Future of the response.
async fn send_request(
fn send_request(
&self,
req: Request<hyper::Body>,
) -> Result<hyper::Response<Body>> {
) -> impl Future<Output = Result<hyper::Response<Body>>> {
let req = match self {
Transport::Tcp { ref client, .. } => client.request(req),
#[cfg(feature = "tls")]
Expand All @@ -212,7 +219,7 @@ impl Transport {
Transport::Unix { ref client, .. } => client.request(req),
};

req.await.map_err(Error::Hyper)
async { req.await.map_err(Error::Hyper) }
}

/// Makes an HTTP request, upgrading the connection to a TCP
Expand Down
9 changes: 5 additions & 4 deletions src/tty.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use crate::{Error, Result};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::BytesMut;
use std::task::Context;
use futures::stream::Stream;
use futures::{
io::{AsyncRead, AsyncWrite},
stream::Stream,
};
use log::trace;
use std::{
future::Future,
io::{self, Cursor},
pin::Pin,
task::Poll,
task::{Context, Poll},
};
use tokio_codec::Decoder;
use tokio_io::{AsyncRead, AsyncWrite};

#[derive(Debug)]
pub struct Chunk {
Expand Down

0 comments on commit 200f598

Please sign in to comment.