Skip to content

Commit

Permalink
EncoderWriter: no longer adhere to ‘at most one write’
Browse files Browse the repository at this point in the history
The wording around Write::write method is changing with requirement
that it maps to ‘at most one write’ being removed¹.  With that, change
EncoderWriter::write so that it flushes entire output buffer at the
beginning and then proceeds to process new input.  This eliminates
returning Ok(0) which is effectively an error.

Also, change accounting for the occupied portion of the output buffer.
Rather than just having occupied length, track occupied range which
means moving data to front is no longer necessary.

¹ rust-lang/rust#107200
  • Loading branch information
mina86 committed Feb 2, 2023
1 parent 92e94d2 commit 38882eb
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 140 deletions.
4 changes: 2 additions & 2 deletions src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,10 +1249,10 @@ fn fill_rand_len<R: rand::Rng>(vec: &mut Vec<u8>, rng: &mut R, len: usize) {
}
}

fn prefixed_data<'i, 'd>(
fn prefixed_data<'i>(
input_with_prefix: &'i mut String,
prefix_len: usize,
data: &'d str,
data: &str,
) -> &'i str {
input_with_prefix.truncate(prefix_len);
input_with_prefix.push_str(data);
Expand Down
221 changes: 106 additions & 115 deletions src/write/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::engine::Engine;
use std::{
cmp, fmt, io,
io::{ErrorKind, Result},
};
use std::io::ErrorKind;
use std::{cmp, fmt, io};

pub(crate) const BUF_SIZE: usize = 1024;
/// The most bytes whose encoding will fit in `BUF_SIZE`
Expand Down Expand Up @@ -53,13 +51,6 @@ const MIN_ENCODE_CHUNK_SIZE: usize = 3;
///
/// It has some minor performance loss compared to encoding slices (a couple percent).
/// It does not do any heap allocation.
///
/// # Limitations
///
/// Owing to the specification of the `write` and `flush` methods on the `Write` trait and their
/// implications for a buffering implementation, these methods may not behave as expected. In
/// particular, calling `write_all` on this interface may fail with `io::ErrorKind::WriteZero`.
/// See the documentation of the `Write` trait implementation for further details.
pub struct EncoderWriter<'e, E: Engine, W: io::Write> {
engine: &'e E,
/// Where encoded data is written to. It's an Option as it's None immediately before Drop is
Expand All @@ -74,21 +65,27 @@ pub struct EncoderWriter<'e, E: Engine, W: io::Write> {
/// Buffer to encode into. May hold leftover encoded bytes from a previous write call that the underlying writer
/// did not write last time.
output: [u8; BUF_SIZE],
/// How much of `output` is occupied with encoded data that couldn't be written last time
output_occupied_len: usize,
/// Occupied portion of output.
///
/// Invariant for the range is that it’s either 0..0 or 0 ≤ start < end ≤
/// BUF_SIZE. This means that if the range is empty, it’s 0..0.
output_range: std::ops::Range<usize>,
/// panic safety: don't write again in destructor if writer panicked while we were writing to it
panicked: bool,
}

impl<'e, E: Engine, W: io::Write> fmt::Debug for EncoderWriter<'e, E, W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let range = self.output_range.clone();
let truncated_len = range.len().min(5);
let truncated_range = range.start..range.start + truncated_len;
write!(
f,
"extra_input: {:?} extra_input_occupied_len:{:?} output[..5]: {:?} output_occupied_len: {:?}",
self.extra_input,
self.extra_input_occupied_len,
&self.output[0..5],
self.output_occupied_len
"extra_input: {:?} occupied output[..{}]: {:?} output_range: {:?}",
&self.extra_input[..self.extra_input_occupied_len],
truncated_len,
&self.output[truncated_range],
range,
)
}
}
Expand All @@ -102,7 +99,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
extra_input: [0u8; MIN_ENCODE_CHUNK_SIZE],
extra_input_occupied_len: 0,
output: [0u8; BUF_SIZE],
output_occupied_len: 0,
output_range: 0..0,
panicked: false,
}
}
Expand All @@ -123,7 +120,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
/// # Errors
///
/// The first error that is not of `ErrorKind::Interrupted` will be returned.
pub fn finish(&mut self) -> Result<W> {
pub fn finish(&mut self) -> io::Result<W> {
// If we could consume self in finish(), we wouldn't have to worry about this case, but
// finish() is retryable in the face of I/O errors, so we can't consume here.
if self.delegate.is_none() {
Expand All @@ -138,91 +135,96 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
}

/// Write any remaining buffered data to the delegate writer.
fn write_final_leftovers(&mut self) -> Result<()> {
fn write_final_leftovers(&mut self) -> io::Result<()> {
if self.delegate.is_none() {
// finish() has already successfully called this, and we are now in drop() with a None
// writer, so just no-op
return Ok(());
}

self.write_all_encoded_output()?;

if self.extra_input_occupied_len > 0 {
// Make sure output isn’t full so we can append to it.
if self.output_range.end == self.output.len() {
self.flush_all_output()?;
}

let encoded_len = self
.engine
.encode_slice(
&self.extra_input[..self.extra_input_occupied_len],
&mut self.output[..],
&mut self.output[self.output_range.end..],
)
.expect("buffer is large enough");

self.output_occupied_len = encoded_len;

self.write_all_encoded_output()?;

// write succeeded, do not write the encoding of extra again if finish() is retried
self.output_range.end += encoded_len;
self.extra_input_occupied_len = 0;
}

Ok(())
self.flush_all_output()
}

/// Write as much of the encoded output to the delegate writer as it will accept, and store the
/// leftovers to be attempted at the next write() call. Updates `self.output_occupied_len`.
/// Flushes output buffer to the delegate.
///
/// # Errors
/// Loops writing data to the delegate until output buffer is empty or
/// delegate returns an error. An `Ok(0)` return from the delegate is
/// treated as an error.
///
/// Errors from the delegate writer are returned. In the case of an error,
/// `self.output_occupied_len` will not be updated, as errors from `write` are specified to mean
/// that no write took place.
fn write_to_delegate(&mut self, current_output_len: usize) -> Result<()> {
self.panicked = true;
let res = self
.delegate
.as_mut()
.expect("Writer must be present")
.write(&self.output[..current_output_len]);
self.panicked = false;

res.map(|consumed| {
debug_assert!(consumed <= current_output_len);

if consumed < current_output_len {
self.output_occupied_len = current_output_len.checked_sub(consumed).unwrap();
// If we're blocking on I/O, the minor inefficiency of copying bytes to the
// start of the buffer is the least of our concerns...
// TODO Rotate moves more than we need to; copy_within now stable.
self.output.rotate_left(consumed);
} else {
self.output_occupied_len = 0;
/// Updates `output_range` accordingly.
fn flush_output(&mut self) -> Option<io::Result<usize>> {
if self.output_range.end == 0 {
return None;
}
loop {
match self.write_to_delegate(self.output_range.clone()) {
Ok(0) => break Some(Ok(0)),
Ok(n) if n >= self.output_range.len() => {
self.output_range = 0..0;
break None;
}
Ok(n) => self.output_range.start += n,
Err(err) => break Some(Err(err)),
}
})
}
}

/// Write all buffered encoded output. If this returns `Ok`, `self.output_occupied_len` is `0`.
///
/// This is basically write_all for the remaining buffered data but without the undesirable
/// abort-on-`Ok(0)` behavior.
///
/// # Errors
/// Flushes output buffer to the delegate ignoring interruptions.
///
/// Any error emitted by the delegate writer abort the write loop and is returned, unless it's
/// `Interrupted`, in which case the error is ignored and writes will continue.
fn write_all_encoded_output(&mut self) -> Result<()> {
while self.output_occupied_len > 0 {
let remaining_len = self.output_occupied_len;
match self.write_to_delegate(remaining_len) {
// try again on interrupts ala write_all
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
// other errors return
Err(e) => return Err(e),
// success no-ops because remaining length is already updated
Ok(_) => {}
};
/// Like [`Self::flush_output`] but ignores [`ErrorKind::Interrupted`]
/// errors and converts `Ok(0)` to [`ErrorKind::WriteZero`].
fn flush_all_output(&mut self) -> io::Result<()> {
if self.output_range.end == 0 {
return Ok(());
}
loop {
match self.write_to_delegate(self.output_range.clone()) {
Ok(0) => {
break Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
))
}
Ok(n) if n >= self.output_range.len() => {
self.output_range = 0..0;
break Ok(());
}
Ok(n) => self.output_range.start += n,
Err(err) if err.kind() == ErrorKind::Interrupted => (),
Err(err) => break Err(err),
}
}
}

debug_assert_eq!(0, self.output_occupied_len);
Ok(())
/// Writes given range of output buffer to the delegate. Performs exactly
/// one write. Sets `panicked` to `true` if delegate panics.
fn write_to_delegate(&mut self, range: std::ops::Range<usize>) -> io::Result<usize> {
self.panicked = true;
let res = self
.delegate
.as_mut()
.expect("Encoder has already had finish() called")
.write(&self.output[range]);
self.panicked = false;
res
}

/// Unwraps this `EncoderWriter`, returning the base writer it writes base64 encoded output
Expand Down Expand Up @@ -262,38 +264,24 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
/// # Errors
///
/// Any errors emitted by the delegate writer are returned.
fn write(&mut self, input: &[u8]) -> Result<usize> {
fn write(&mut self, input: &[u8]) -> io::Result<usize> {
if self.delegate.is_none() {
panic!("Cannot write more after calling finish()");
}

if input.is_empty() {
return Ok(0);
if let Some(res) = self.flush_output() {
return res;
}
debug_assert_eq!(0, self.output_range.len());

// The contract of `Write::write` places some constraints on this implementation:
// - a call to `write()` represents at most one call to a wrapped `Write`, so we can't
// iterate over the input and encode multiple chunks.
// - Errors mean that "no bytes were written to this writer", so we need to reset the
// internal state to what it was before the error occurred

// before reading any input, write any leftover encoded output from last time
if self.output_occupied_len > 0 {
let current_len = self.output_occupied_len;
return self
.write_to_delegate(current_len)
// did not read any input
.map(|_| 0);
if input.is_empty() {
return Ok(0);
}

debug_assert_eq!(0, self.output_occupied_len);

// how many bytes, if any, were read into `extra` to create a triple to encode
let mut extra_input_read_len = 0;
let mut input = input;

let orig_extra_len = self.extra_input_occupied_len;

let mut encoded_size = 0;
// always a multiple of MIN_ENCODE_CHUNK_SIZE
let mut max_input_len = MAX_INPUT_LEN;
Expand Down Expand Up @@ -322,8 +310,10 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {

input = &input[extra_input_read_len..];

// consider extra to be used up, since we encoded it
self.extra_input_occupied_len = 0;
// Note: Not updating self.extra_input_occupied_len yet. It’s
// going to be zeroed at the end of the function if we
// successfully write some data to delegate.

// don't clobber where we just encoded to
encoded_size = 4;
// and don't read more than can be encoded
Expand Down Expand Up @@ -367,29 +357,30 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
&mut self.output[encoded_size..],
);

// not updating `self.output_occupied_len` here because if the below write fails, it should
// "never take place" -- the buffer contents we encoded are ignored and perhaps retried
// later, if the consumer chooses.

self.write_to_delegate(encoded_size)
// no matter whether we wrote the full encoded buffer or not, we consumed the same
// input
.map(|_| extra_input_read_len + input_chunks_to_encode_len)
.map_err(|e| {
// in case we filled and encoded `extra`, reset extra_len
self.extra_input_occupied_len = orig_extra_len;
// Not updating `self.output_range` here because if the write fails, it
// should "never take place" -- the buffer contents we encoded are
// ignored and perhaps retried later, if the consumer chooses.

e
})
self.write_to_delegate(0..encoded_size).map(|written| {
if written < encoded_size {
// Update output range to portion which is yet to be written.
self.output_range = written..encoded_size;
} else {
// Everything was written, leave output range empty.
debug_assert_eq!(0..0, self.output_range);
}
self.extra_input_occupied_len = 0;
extra_input_read_len + input_chunks_to_encode_len
})
}

/// Because this is usually treated as OK to call multiple times, it will *not* flush any
/// incomplete chunks of input or write padding.
/// # Errors
///
/// The first error that is not of [`ErrorKind::Interrupted`] will be returned.
fn flush(&mut self) -> Result<()> {
self.write_all_encoded_output()?;
fn flush(&mut self) -> io::Result<()> {
self.flush_all_output()?;
self.delegate
.as_mut()
.expect("Writer must be present")
Expand Down
Loading

0 comments on commit 38882eb

Please sign in to comment.