diff --git a/src/engine/tests.rs b/src/engine/tests.rs index 906bba0..a5beef2 100644 --- a/src/engine/tests.rs +++ b/src/engine/tests.rs @@ -1249,11 +1249,7 @@ fn fill_rand_len(vec: &mut Vec, rng: &mut R, len: usize) { } } -fn prefixed_data<'i, 'd>( - input_with_prefix: &'i mut String, - prefix_len: usize, - data: &'d str, -) -> &'i str { +fn prefixed_data<'i>(input_with_prefix: &'i mut String, prefix_len: usize, data: &str) -> &'i str { input_with_prefix.truncate(prefix_len); input_with_prefix.push_str(data); input_with_prefix.as_str() diff --git a/src/write/encoder.rs b/src/write/encoder.rs index 1c19bb4..1412e2f 100644 --- a/src/write/encoder.rs +++ b/src/write/encoder.rs @@ -1,8 +1,6 @@ use crate::engine::Engine; -use std::{ - cmp, fmt, io, - io::{ErrorKind, Result}, -}; +use std::io::ErrorKind; +use std::{cmp, fmt, io}; pub(crate) const BUF_SIZE: usize = 1024; /// The most bytes whose encoding will fit in `BUF_SIZE` @@ -53,13 +51,6 @@ const MIN_ENCODE_CHUNK_SIZE: usize = 3; /// /// It has some minor performance loss compared to encoding slices (a couple percent). /// It does not do any heap allocation. -/// -/// # Limitations -/// -/// Owing to the specification of the `write` and `flush` methods on the `Write` trait and their -/// implications for a buffering implementation, these methods may not behave as expected. In -/// particular, calling `write_all` on this interface may fail with `io::ErrorKind::WriteZero`. -/// See the documentation of the `Write` trait implementation for further details. pub struct EncoderWriter<'e, E: Engine, W: io::Write> { engine: &'e E, /// Where encoded data is written to. It's an Option as it's None immediately before Drop is @@ -74,21 +65,27 @@ pub struct EncoderWriter<'e, E: Engine, W: io::Write> { /// Buffer to encode into. May hold leftover encoded bytes from a previous write call that the underlying writer /// did not write last time. output: [u8; BUF_SIZE], - /// How much of `output` is occupied with encoded data that couldn't be written last time - output_occupied_len: usize, + /// Occupied portion of output. + /// + /// Invariant for the range is that it’s either 0..0 or 0 ≤ start < end ≤ + /// BUF_SIZE. This means that if the range is empty, it’s 0..0. + output_range: std::ops::Range, /// panic safety: don't write again in destructor if writer panicked while we were writing to it panicked: bool, } impl<'e, E: Engine, W: io::Write> fmt::Debug for EncoderWriter<'e, E, W> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let range = self.output_range.clone(); + let truncated_len = range.len().min(5); + let truncated_range = range.start..range.start + truncated_len; write!( f, - "extra_input: {:?} extra_input_occupied_len:{:?} output[..5]: {:?} output_occupied_len: {:?}", - self.extra_input, - self.extra_input_occupied_len, - &self.output[0..5], - self.output_occupied_len + "extra_input: {:?} occupied output[..{}]: {:?} output_range: {:?}", + &self.extra_input[..self.extra_input_occupied_len], + truncated_len, + &self.output[truncated_range], + range, ) } } @@ -102,7 +99,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> { extra_input: [0u8; MIN_ENCODE_CHUNK_SIZE], extra_input_occupied_len: 0, output: [0u8; BUF_SIZE], - output_occupied_len: 0, + output_range: 0..0, panicked: false, } } @@ -123,7 +120,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> { /// # Errors /// /// The first error that is not of `ErrorKind::Interrupted` will be returned. - pub fn finish(&mut self) -> Result { + pub fn finish(&mut self) -> io::Result { // If we could consume self in finish(), we wouldn't have to worry about this case, but // finish() is retryable in the face of I/O errors, so we can't consume here. if self.delegate.is_none() { @@ -138,91 +135,96 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> { } /// Write any remaining buffered data to the delegate writer. - fn write_final_leftovers(&mut self) -> Result<()> { + fn write_final_leftovers(&mut self) -> io::Result<()> { if self.delegate.is_none() { // finish() has already successfully called this, and we are now in drop() with a None // writer, so just no-op return Ok(()); } - self.write_all_encoded_output()?; - if self.extra_input_occupied_len > 0 { + // Make sure output isn’t full so we can append to it. + if self.output_range.end == self.output.len() { + self.flush_all_output()?; + } + let encoded_len = self .engine .encode_slice( &self.extra_input[..self.extra_input_occupied_len], - &mut self.output[..], + &mut self.output[self.output_range.end..], ) .expect("buffer is large enough"); - self.output_occupied_len = encoded_len; - - self.write_all_encoded_output()?; - - // write succeeded, do not write the encoding of extra again if finish() is retried + self.output_range.end += encoded_len; self.extra_input_occupied_len = 0; } - Ok(()) + self.flush_all_output() } - /// Write as much of the encoded output to the delegate writer as it will accept, and store the - /// leftovers to be attempted at the next write() call. Updates `self.output_occupied_len`. + /// Flushes output buffer to the delegate. /// - /// # Errors + /// Loops writing data to the delegate until output buffer is empty or + /// delegate returns an error. An `Ok(0)` return from the delegate is + /// treated as an error. /// - /// Errors from the delegate writer are returned. In the case of an error, - /// `self.output_occupied_len` will not be updated, as errors from `write` are specified to mean - /// that no write took place. - fn write_to_delegate(&mut self, current_output_len: usize) -> Result<()> { - self.panicked = true; - let res = self - .delegate - .as_mut() - .expect("Writer must be present") - .write(&self.output[..current_output_len]); - self.panicked = false; - - res.map(|consumed| { - debug_assert!(consumed <= current_output_len); - - if consumed < current_output_len { - self.output_occupied_len = current_output_len.checked_sub(consumed).unwrap(); - // If we're blocking on I/O, the minor inefficiency of copying bytes to the - // start of the buffer is the least of our concerns... - // TODO Rotate moves more than we need to; copy_within now stable. - self.output.rotate_left(consumed); - } else { - self.output_occupied_len = 0; + /// Updates `output_range` accordingly. + fn flush_output(&mut self) -> Option> { + if self.output_range.end == 0 { + return None; + } + loop { + match self.write_to_delegate(self.output_range.clone()) { + Ok(0) => break Some(Ok(0)), + Ok(n) if n >= self.output_range.len() => { + self.output_range = 0..0; + break None; + } + Ok(n) => self.output_range.start += n, + Err(err) => break Some(Err(err)), } - }) + } } - /// Write all buffered encoded output. If this returns `Ok`, `self.output_occupied_len` is `0`. - /// - /// This is basically write_all for the remaining buffered data but without the undesirable - /// abort-on-`Ok(0)` behavior. - /// - /// # Errors + /// Flushes output buffer to the delegate ignoring interruptions. /// - /// Any error emitted by the delegate writer abort the write loop and is returned, unless it's - /// `Interrupted`, in which case the error is ignored and writes will continue. - fn write_all_encoded_output(&mut self) -> Result<()> { - while self.output_occupied_len > 0 { - let remaining_len = self.output_occupied_len; - match self.write_to_delegate(remaining_len) { - // try again on interrupts ala write_all - Err(ref e) if e.kind() == ErrorKind::Interrupted => {} - // other errors return - Err(e) => return Err(e), - // success no-ops because remaining length is already updated - Ok(_) => {} - }; + /// Like [`Self::flush_output`] but ignores [`ErrorKind::Interrupted`] + /// errors and converts `Ok(0)` to [`ErrorKind::WriteZero`]. + fn flush_all_output(&mut self) -> io::Result<()> { + if self.output_range.end == 0 { + return Ok(()); } + loop { + match self.write_to_delegate(self.output_range.clone()) { + Ok(0) => { + break Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write whole buffer", + )) + } + Ok(n) if n >= self.output_range.len() => { + self.output_range = 0..0; + break Ok(()); + } + Ok(n) => self.output_range.start += n, + Err(err) if err.kind() == ErrorKind::Interrupted => (), + Err(err) => break Err(err), + } + } + } - debug_assert_eq!(0, self.output_occupied_len); - Ok(()) + /// Writes given range of output buffer to the delegate. Performs exactly + /// one write. Sets `panicked` to `true` if delegate panics. + fn write_to_delegate(&mut self, range: std::ops::Range) -> io::Result { + self.panicked = true; + let res = self + .delegate + .as_mut() + .expect("Encoder has already had finish() called") + .write(&self.output[range]); + self.panicked = false; + res } /// Unwraps this `EncoderWriter`, returning the base writer it writes base64 encoded output @@ -262,38 +264,24 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> { /// # Errors /// /// Any errors emitted by the delegate writer are returned. - fn write(&mut self, input: &[u8]) -> Result { + fn write(&mut self, input: &[u8]) -> io::Result { if self.delegate.is_none() { panic!("Cannot write more after calling finish()"); } - if input.is_empty() { - return Ok(0); + if let Some(res) = self.flush_output() { + return res; } + debug_assert_eq!(0, self.output_range.len()); - // The contract of `Write::write` places some constraints on this implementation: - // - a call to `write()` represents at most one call to a wrapped `Write`, so we can't - // iterate over the input and encode multiple chunks. - // - Errors mean that "no bytes were written to this writer", so we need to reset the - // internal state to what it was before the error occurred - - // before reading any input, write any leftover encoded output from last time - if self.output_occupied_len > 0 { - let current_len = self.output_occupied_len; - return self - .write_to_delegate(current_len) - // did not read any input - .map(|_| 0); + if input.is_empty() { + return Ok(0); } - debug_assert_eq!(0, self.output_occupied_len); - // how many bytes, if any, were read into `extra` to create a triple to encode let mut extra_input_read_len = 0; let mut input = input; - let orig_extra_len = self.extra_input_occupied_len; - let mut encoded_size = 0; // always a multiple of MIN_ENCODE_CHUNK_SIZE let mut max_input_len = MAX_INPUT_LEN; @@ -322,8 +310,10 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> { input = &input[extra_input_read_len..]; - // consider extra to be used up, since we encoded it - self.extra_input_occupied_len = 0; + // Note: Not updating self.extra_input_occupied_len yet. It’s + // going to be zeroed at the end of the function if we + // successfully write some data to delegate. + // don't clobber where we just encoded to encoded_size = 4; // and don't read more than can be encoded @@ -367,20 +357,21 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> { &mut self.output[encoded_size..], ); - // not updating `self.output_occupied_len` here because if the below write fails, it should - // "never take place" -- the buffer contents we encoded are ignored and perhaps retried - // later, if the consumer chooses. - - self.write_to_delegate(encoded_size) - // no matter whether we wrote the full encoded buffer or not, we consumed the same - // input - .map(|_| extra_input_read_len + input_chunks_to_encode_len) - .map_err(|e| { - // in case we filled and encoded `extra`, reset extra_len - self.extra_input_occupied_len = orig_extra_len; + // Not updating `self.output_range` here because if the write fails, it + // should "never take place" -- the buffer contents we encoded are + // ignored and perhaps retried later, if the consumer chooses. - e - }) + self.write_to_delegate(0..encoded_size).map(|written| { + if written < encoded_size { + // Update output range to portion which is yet to be written. + self.output_range = written..encoded_size; + } else { + // Everything was written, leave output range empty. + debug_assert_eq!(0..0, self.output_range); + } + self.extra_input_occupied_len = 0; + extra_input_read_len + input_chunks_to_encode_len + }) } /// Because this is usually treated as OK to call multiple times, it will *not* flush any @@ -388,8 +379,8 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> { /// # Errors /// /// The first error that is not of [`ErrorKind::Interrupted`] will be returned. - fn flush(&mut self) -> Result<()> { - self.write_all_encoded_output()?; + fn flush(&mut self) -> io::Result<()> { + self.flush_all_output()?; self.delegate .as_mut() .expect("Writer must be present") diff --git a/src/write/encoder_tests.rs b/src/write/encoder_tests.rs index ce76d63..849200e 100644 --- a/src/write/encoder_tests.rs +++ b/src/write/encoder_tests.rs @@ -352,16 +352,7 @@ fn retrying_writes_that_error_with_interrupted_works() { bytes_consumed += input_len; } - loop { - let res = stream_encoder.finish(); - match res { - Ok(_) => break, - Err(e) => match e.kind() { - io::ErrorKind::Interrupted => continue, - _ => Err(e).unwrap(), // bail - }, - } - } + let _ = stream_encoder.finish().unwrap(); assert_eq!(orig_len, bytes_consumed); } @@ -412,13 +403,12 @@ fn writes_that_only_write_part_of_input_and_sometimes_interrupt_produce_correct_ // retry on interrupt match res { - Ok(len) => bytes_consumed += len, - Err(e) => match e.kind() { - io::ErrorKind::Interrupted => continue, - _ => { - panic!("should not see other errors"); - } - }, + Ok(0) => assert_eq!(0, input_len), + Ok(len) => { + assert!(len <= input_len); + bytes_consumed += len; + } + Err(e) => assert_eq!(io::ErrorKind::Interrupted, e.kind()), } } @@ -506,7 +496,7 @@ struct InterruptingWriter<'a, W: 'a + Write, R: 'a + Rng> { impl<'a, W: Write, R: Rng> Write for InterruptingWriter<'a, W, R> { fn write(&mut self, buf: &[u8]) -> io::Result { - if self.rng.gen_range(0.0..1.0) <= self.fraction { + if self.rng.gen_bool(self.fraction) { return Err(io::Error::new(io::ErrorKind::Interrupted, "interrupted")); } @@ -514,7 +504,7 @@ impl<'a, W: Write, R: Rng> Write for InterruptingWriter<'a, W, R> { } fn flush(&mut self) -> io::Result<()> { - if self.rng.gen_range(0.0..1.0) <= self.fraction { + if self.rng.gen_bool(self.fraction) { return Err(io::Error::new(io::ErrorKind::Interrupted, "interrupted")); } @@ -534,17 +524,21 @@ struct PartialInterruptingWriter<'a, W: 'a + Write, R: 'a + Rng> { impl<'a, W: Write, R: Rng> Write for PartialInterruptingWriter<'a, W, R> { fn write(&mut self, buf: &[u8]) -> io::Result { - if self.rng.gen_range(0.0..1.0) > self.no_interrupt_fraction { + if !self.rng.gen_bool(self.no_interrupt_fraction) { return Err(io::Error::new(io::ErrorKind::Interrupted, "interrupted")); } - if self.rng.gen_range(0.0..1.0) <= self.full_input_fraction || buf.is_empty() { + if buf.len() <= 1 || self.rng.gen_bool(self.full_input_fraction) { // pass through the buf untouched self.w.write(buf) } else { // only use a prefix of it - self.w - .write(&buf[0..(self.rng.gen_range(0..(buf.len() - 1)))]) + let end = if buf.len() == 2 { + 1 + } else { + self.rng.gen_range(1..(buf.len() - 1)) + }; + self.w.write(&buf[..end]) } }