Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: impl AsyncSeek for BufReader/BufWriter #3491

Merged
merged 4 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 121 additions & 3 deletions tokio/src/io/util/buf_reader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};

use pin_project_lite::pin_project;
use std::io;
use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, fmt};
use std::{cmp, fmt, mem};

pin_project! {
/// The `BufReader` struct adds buffering to any reader.
Expand All @@ -30,6 +30,7 @@ pin_project! {
pub(super) buf: Box<[u8]>,
pub(super) pos: usize,
pub(super) cap: usize,
pub(super) seek_state: SeekState,
}
}

Expand All @@ -48,6 +49,7 @@ impl<R: AsyncRead> BufReader<R> {
buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
seek_state: SeekState::Init,
}
}

Expand Down Expand Up @@ -141,6 +143,122 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
}
}

#[derive(Debug, Clone, Copy)]
pub(super) enum SeekState {
/// start_seek has not been called.
Init,
/// start_seek has been called, but poll_complete has not yet been called.
Start(SeekFrom),
/// Waiting for completion of the first poll_complete in the `n.checked_sub(remainder).is_none()` branch.
PendingOverflowed(i64),
/// Waiting for completion of poll_complete.
Pending,
}

/// Seek to an offset, in bytes, in the underlying reader.
///
/// The position used for seeking with `SeekFrom::Current(_)` is the
/// position the underlying reader would be at if the `BufReader` had no
/// internal buffer.
///
/// Seeking always discards the internal buffer, even if the seek position
/// would otherwise fall within it. This guarantees that calling
/// `.into_inner()` immediately after a seek yields the underlying reader
/// at the same position.
///
/// See [`AsyncSeek`] for more details.
///
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
/// where `n` minus the internal buffer length overflows an `i64`, two
/// seeks will be performed instead of one. If the second seek returns
/// `Err`, the underlying reader will be left at the same position it would
/// have if you called `seek` with `SeekFrom::Current(0)`.
impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
// We needs to call seek operation multiple times.
// And we should always call both start_seek and poll_complete,
// as start_seek alone cannot guarantee that the operation will be completed.
// poll_complete receives a Context and returns a Poll, so it cannot be called
// inside start_seek.
*self.project().seek_state = SeekState::Start(pos);
Comment on lines +178 to +183
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another reason why this implementation is more complicated than the futures-rs one.

Ok(())
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
let res = match mem::replace(self.as_mut().project().seek_state, SeekState::Init) {
SeekState::Init => {
// 1.x AsyncSeek recommends calling poll_complete before start_seek.
// We don't have to guarantee that the value returned by
// poll_complete called without start_seek is correct,
// so we'll return 0.
Comment on lines +190 to +193
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to guarantee that the value returned by poll_complete called without start_seek is correct,

@carllerche @Darksonn Is this assumption correct? If not, are there rules I need to follow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Ok(0) if poll_complete is called before start_seek seems reasonable to me.

return Poll::Ready(Ok(0));
}
SeekState::Start(SeekFrom::Current(n)) => {
let remainder = (self.cap - self.pos) as i64;
// it should be safe to assume that remainder fits within an i64 as the alternative
// means we managed to allocate 8 exbibytes and that's absurd.
// But it's not out of the realm of possibility for some weird underlying reader to
// support seeking by i64::min_value() so we need to handle underflow when subtracting
// remainder.
if let Some(offset) = n.checked_sub(remainder) {
self.as_mut()
.get_pin_mut()
.start_seek(SeekFrom::Current(offset))?;
self.as_mut().get_pin_mut().poll_complete(cx)?
} else {
// seek backwards by our remainder, and then by the offset
self.as_mut()
.get_pin_mut()
.start_seek(SeekFrom::Current(-remainder))?;
if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
*self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
return Poll::Pending;
}

// https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
self.as_mut().discard_buffer();

self.as_mut()
.get_pin_mut()
.start_seek(SeekFrom::Current(n))?;
self.as_mut().get_pin_mut().poll_complete(cx)?
}
}
SeekState::PendingOverflowed(n) => {
if self.as_mut().get_pin_mut().poll_complete(cx)?.is_pending() {
*self.as_mut().project().seek_state = SeekState::PendingOverflowed(n);
return Poll::Pending;
}

// https://github.com/rust-lang/rust/pull/61157#issuecomment-495932676
self.as_mut().discard_buffer();

self.as_mut()
.get_pin_mut()
.start_seek(SeekFrom::Current(n))?;
self.as_mut().get_pin_mut().poll_complete(cx)?
}
SeekState::Start(pos) => {
// Seeking with Start/End doesn't care about our buffer length.
self.as_mut().get_pin_mut().start_seek(pos)?;
self.as_mut().get_pin_mut().poll_complete(cx)?
}
SeekState::Pending => self.as_mut().get_pin_mut().poll_complete(cx)?,
};

match res {
Poll::Ready(res) => {
self.discard_buffer();
Poll::Ready(Ok(res))
}
Poll::Pending => {
*self.as_mut().project().seek_state = SeekState::Pending;
Poll::Pending
}
}
}
}

impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: Pin<&mut Self>,
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/io/util/buf_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
buf: rbuf,
pos,
cap,
seek_state: rseek_state,
},
buf: wbuf,
written,
seek_state: wseek_state,
} = b;

BufStream {
Expand All @@ -105,10 +107,12 @@ impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
inner,
buf: wbuf,
written,
seek_state: wseek_state,
},
buf: rbuf,
pos,
cap,
seek_state: rseek_state,
},
}
}
Expand Down
44 changes: 42 additions & 2 deletions tokio/src/io/util/buf_writer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};

use pin_project_lite::pin_project;
use std::fmt;
use std::io::{self, Write};
use std::io::{self, SeekFrom, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -34,6 +34,7 @@ pin_project! {
pub(super) inner: W,
pub(super) buf: Vec<u8>,
pub(super) written: usize,
pub(super) seek_state: Option<SeekFrom>
}
}

Expand All @@ -50,6 +51,7 @@ impl<W: AsyncWrite> BufWriter<W> {
inner,
buf: Vec::with_capacity(cap),
written: 0,
seek_state: None,
}
}

Expand Down Expand Up @@ -142,6 +144,44 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}

/// Seek to the offset, in bytes, in the underlying writer.
///
/// Seeking always writes out the internal buffer before seeking.
impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
// We need to flush the internal buffer before seeking.
// It receives a `Context` and returns a `Poll`, so it cannot be called
// inside `start_seek`.
*self.project().seek_state = Some(pos);
Ok(())
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
let pos = match self.seek_state {
Some(pos) => pos,
// 1.x AsyncSeek recommends calling poll_complete before start_seek.
// We don't have to guarantee that the value returned by
// poll_complete called without start_seek is correct,
// so we'll return 0.
None => return Poll::Ready(Ok(0)),
};

// Flush the internal buffer before seeking.
ready!(self.as_mut().flush_buf(cx))?;

let mut me = self.project();
if let Err(e) = me.inner.as_mut().start_seek(pos) {
*me.seek_state = None;
return Poll::Ready(Err(e));
}
let res = me.inner.poll_complete(cx);
if res.is_ready() {
*me.seek_state = None;
}
res
}
}

impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
Expand Down
Loading