From ad3df76c0cc3f5284500cb8d40735bd097d16f95 Mon Sep 17 00:00:00 2001 From: The 8472 Date: Fri, 24 Nov 2023 01:00:25 +0100 Subject: [PATCH] unify read_to_end and io::copy impls for reading into a Vec --- library/std/src/io/copy.rs | 69 +----------------- library/std/src/io/copy/tests.rs | 11 +-- library/std/src/io/mod.rs | 118 +++++++++++++++++++++---------- 3 files changed, 91 insertions(+), 107 deletions(-) diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index 4d51a719f6cac..28c97dbbf22c0 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -1,7 +1,6 @@ use super::{BorrowedBuf, BufReader, BufWriter, Read, Result, Write, DEFAULT_BUF_SIZE}; use crate::alloc::Allocator; use crate::cmp; -use crate::cmp::min; use crate::collections::VecDeque; use crate::io::IoSlice; use crate::mem::MaybeUninit; @@ -256,79 +255,17 @@ impl BufferedWriterSpec for BufWriter { } } -impl BufferedWriterSpec for Vec { +impl BufferedWriterSpec for Vec { fn buffer_size(&self) -> usize { cmp::max(DEFAULT_BUF_SIZE, self.capacity() - self.len()) } fn copy_from(&mut self, reader: &mut R) -> Result { - let mut bytes = 0; - - // avoid inflating empty/small vecs before we have determined that there's anything to read - if self.capacity() < DEFAULT_BUF_SIZE { - let stack_read_limit = DEFAULT_BUF_SIZE as u64; - bytes = stack_buffer_copy(&mut reader.take(stack_read_limit), self)?; - // fewer bytes than requested -> EOF reached - if bytes < stack_read_limit { - return Ok(bytes); - } - } - - // don't immediately offer the vec's whole spare capacity, otherwise - // we might have to fully initialize it if the reader doesn't have a custom read_buf() impl - let mut max_read_size = DEFAULT_BUF_SIZE; - - loop { - self.reserve(DEFAULT_BUF_SIZE); - let mut initialized_spare_capacity = 0; - - loop { - let buf = self.spare_capacity_mut(); - let read_size = min(max_read_size, buf.len()); - let mut buf = BorrowedBuf::from(&mut buf[..read_size]); - // SAFETY: init is either 0 or the init_len from the previous iteration. - unsafe { - buf.set_init(initialized_spare_capacity); - } - match reader.read_buf(buf.unfilled()) { - Ok(()) => { - let bytes_read = buf.len(); - - // EOF - if bytes_read == 0 { - return Ok(bytes); - } - - // the reader is returning short reads but it doesn't call ensure_init() - if buf.init_len() < buf.capacity() { - max_read_size = usize::MAX; - } - // the reader hasn't returned short reads so far - if bytes_read == buf.capacity() { - max_read_size *= 2; - } - - initialized_spare_capacity = buf.init_len() - bytes_read; - bytes += bytes_read as u64; - // SAFETY: BorrowedBuf guarantees all of its filled bytes are init - // and the number of read bytes can't exceed the spare capacity since - // that's what the buffer is borrowing from. - unsafe { self.set_len(self.len() + bytes_read) }; - - // spare capacity full, reserve more - if self.len() == self.capacity() { - break; - } - } - Err(e) if e.is_interrupted() => continue, - Err(e) => return Err(e), - } - } - } + reader.read_to_end(self).map(|bytes| u64::try_from(bytes).expect("usize overflowed u64")) } } -fn stack_buffer_copy( +pub(super) fn stack_buffer_copy( reader: &mut R, writer: &mut W, ) -> Result { diff --git a/library/std/src/io/copy/tests.rs b/library/std/src/io/copy/tests.rs index af137eaf856f5..a1f909a3c5386 100644 --- a/library/std/src/io/copy/tests.rs +++ b/library/std/src/io/copy/tests.rs @@ -82,13 +82,16 @@ fn copy_specializes_bufreader() { #[test] fn copy_specializes_to_vec() { - let cap = 123456; - let mut source = ShortReader { cap, observed_buffer: 0, read_size: 1337 }; + let cap = DEFAULT_BUF_SIZE * 10; + let mut source = ShortReader { cap, observed_buffer: 0, read_size: DEFAULT_BUF_SIZE }; let mut sink = Vec::new(); - assert_eq!(cap as u64, io::copy(&mut source, &mut sink).unwrap()); + let copied = io::copy(&mut source, &mut sink).unwrap(); + assert_eq!(cap as u64, copied); + assert_eq!(sink.len() as u64, copied); assert!( source.observed_buffer > DEFAULT_BUF_SIZE, - "expected a large buffer to be provided to the reader" + "expected a large buffer to be provided to the reader, got {}", + source.observed_buffer ); } diff --git a/library/std/src/io/mod.rs b/library/std/src/io/mod.rs index 7d70a0bac24fd..29a4b727b6699 100644 --- a/library/std/src/io/mod.rs +++ b/library/std/src/io/mod.rs @@ -397,12 +397,16 @@ where } } -// This uses an adaptive system to extend the vector when it fills. We want to -// avoid paying to allocate and zero a huge chunk of memory if the reader only -// has 4 bytes while still making large reads if the reader does have a ton -// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every -// time is 4,500 times (!) slower than a default reservation size of 32 if the -// reader has a very small amount of data to return. +// Here we must serve many masters with conflicting goals: +// +// - avoid allocating unless necessary +// - avoid overallocating if we know the exact size (#89165) +// - avoid passing large buffers to readers that always initialize the free capacity if they perform short reads (#23815, #23820) +// - pass large buffers to readers that do not initialize the spare capacity. this can amortize per-call overheads +// - and finally pass not-too-small and not-too-large buffers to Windows read APIs because they manage to suffer from both problems +// at the same time, i.e. small reads suffer from syscall overhead, all reads incur initialization cost +// proportional to buffer size (#110650) +// pub(crate) fn default_read_to_end( r: &mut R, buf: &mut Vec, @@ -412,20 +416,58 @@ pub(crate) fn default_read_to_end( let start_cap = buf.capacity(); // Optionally limit the maximum bytes read on each iteration. // This adds an arbitrary fiddle factor to allow for more data than we expect. - let max_read_size = - size_hint.and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE)); + let mut max_read_size = size_hint + .and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE)) + .unwrap_or(DEFAULT_BUF_SIZE); let mut initialized = 0; // Extra initialized bytes from previous loop iteration + + const PROBE_SIZE: usize = 32; + + fn small_probe_read(r: &mut R, buf: &mut Vec) -> Result { + let mut probe = [0u8; PROBE_SIZE]; + + loop { + match r.read(&mut probe) { + Ok(n) => { + buf.extend_from_slice(&probe[..n]); + return Ok(n); + } + Err(ref e) if e.is_interrupted() => continue, + Err(e) => return Err(e), + } + } + } + + // avoid inflating empty/small vecs before we have determined that there's anything to read + if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE { + let read = small_probe_read(r, buf)?; + + if read == 0 { + return Ok(0); + } + } + loop { + if buf.len() == buf.capacity() && buf.capacity() == start_cap { + // The buffer might be an exact fit. Let's read into a probe buffer + // and see if it returns `Ok(0)`. If so, we've avoided an + // unnecessary doubling of the capacity. But if not, append the + // probe buffer to the primary buffer and let its capacity grow. + let read = small_probe_read(r, buf)?; + + if read == 0 { + return Ok(buf.len() - start_len); + } + } + if buf.len() == buf.capacity() { - buf.reserve(32); // buf is full, need more space + buf.reserve(PROBE_SIZE); // buf is full, need more space } let mut spare = buf.spare_capacity_mut(); - if let Some(size) = max_read_size { - let len = cmp::min(spare.len(), size); - spare = &mut spare[..len] - } + let buf_len = cmp::min(spare.len(), max_read_size); + spare = &mut spare[..buf_len]; let mut read_buf: BorrowedBuf<'_> = spare.into(); // SAFETY: These bytes were initialized but not filled in the previous loop @@ -434,42 +476,44 @@ pub(crate) fn default_read_to_end( } let mut cursor = read_buf.unfilled(); - match r.read_buf(cursor.reborrow()) { - Ok(()) => {} - Err(e) if e.is_interrupted() => continue, - Err(e) => return Err(e), + loop { + match r.read_buf(cursor.reborrow()) { + Ok(()) => break, + Err(e) if e.is_interrupted() => continue, + Err(e) => return Err(e), + } } - if cursor.written() == 0 { + let unfilled_but_initialized = cursor.init_ref().len(); + let bytes_read = cursor.written(); + let was_fully_initialized = read_buf.init_len() == buf_len; + + if bytes_read == 0 { return Ok(buf.len() - start_len); } // store how much was initialized but not filled - initialized = cursor.init_ref().len(); + initialized = unfilled_but_initialized; // SAFETY: BorrowedBuf's invariants mean this much memory is initialized. unsafe { - let new_len = read_buf.filled().len() + buf.len(); + let new_len = bytes_read + buf.len(); buf.set_len(new_len); } - if buf.len() == buf.capacity() && buf.capacity() == start_cap { - // The buffer might be an exact fit. Let's read into a probe buffer - // and see if it returns `Ok(0)`. If so, we've avoided an - // unnecessary doubling of the capacity. But if not, append the - // probe buffer to the primary buffer and let its capacity grow. - let mut probe = [0u8; 32]; - - loop { - match r.read(&mut probe) { - Ok(0) => return Ok(buf.len() - start_len), - Ok(n) => { - buf.extend_from_slice(&probe[..n]); - break; - } - Err(ref e) if e.is_interrupted() => continue, - Err(e) => return Err(e), - } + // Use heuristics to determine the max read size if no initial size hint was provided + if size_hint.is_none() { + // The reader is returning short reads but it doesn't call ensure_init(). + // In that case we no longer need to restrict read sizes to avoid + // initialization costs. + if !was_fully_initialized { + max_read_size = usize::MAX; + } + + // we have passed a larger buffer than previously and the + // reader still hasn't returned a short read + if buf_len >= max_read_size && bytes_read == buf_len { + max_read_size = max_read_size.saturating_mul(2); } } }