Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved performance of reading #157

Merged
merged 1 commit into from
Jul 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/bloom_filter/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ pub fn read<R: Read + Seek>(
bitset.clear();
return Ok(());
}
// read bitset
if header.num_bytes as usize > bitset.capacity() {
*bitset = vec![0; header.num_bytes as usize]
} else {
bitset.clear();
bitset.resize(header.num_bytes as usize, 0); // populate with zeros
}

reader.read_exact(bitset)?;
let length: usize = header.num_bytes.try_into()?;

bitset.clear();
bitset.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(bitset)?;

Ok(())
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ impl From<std::io::Error> for Error {
}
}

impl From<std::collections::TryReserveError> for Error {
fn from(e: std::collections::TryReserveError) -> Error {
Error::General(format!("OOM: {}", e))
}
}

impl From<std::num::TryFromIntError> for Error {
fn from(e: std::num::TryFromIntError) -> Error {
Error::OutOfSpec(format!("Number must be zero or positive: {}", e))
Expand Down
12 changes: 8 additions & 4 deletions src/read/indexes/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ pub fn read_columns_indexes<R: Read + Seek>(
let length = lengths.iter().sum::<usize>();

reader.seek(SeekFrom::Start(offset))?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

let mut data = vec![];
data.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(&mut data)?;

deserialize_column_indexes(chunks, &data, lengths)
}
Expand Down Expand Up @@ -122,8 +124,10 @@ pub fn read_pages_locations<R: Read + Seek>(
let length = lengths.iter().sum::<usize>();

reader.seek(SeekFrom::Start(offset))?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

let mut data = vec![];
data.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(&mut data)?;

deserialize_page_locations(&data, chunks.len())
}
17 changes: 13 additions & 4 deletions src/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
let mut buffer = vec![0; default_end_len];
reader.read_exact(&mut buffer)?;

let mut buffer = Vec::with_capacity(default_end_len);
reader
.by_ref()
.take(default_end_len as u64)
.read_to_end(&mut buffer)?;

// check this is indeed a parquet file
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
Expand Down Expand Up @@ -87,8 +91,13 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer)?;

buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader
.by_ref()
.take(footer_len as u64)
.read_to_end(&mut buffer)?;

Cursor::new(buffer)
};
Expand Down
17 changes: 4 additions & 13 deletions src/read/page/indexed_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,7 @@ pub struct IndexedPageReader<R: Read + Seek> {
pages: VecDeque<FilteredPage>,
}

fn resize_buffer(buffer: &mut Vec<u8>, length: usize) {
// prepare buffer
if length > buffer.len() {
// dealloc and ignore region, replacing it by a new region
*buffer = vec![0u8; length];
} else {
buffer.clear();
buffer.resize(length, 0);
}
}

#[allow(clippy::ptr_arg)] // false positive
fn read_page<R: Read + Seek>(
reader: &mut R,
start: u64,
Expand All @@ -66,8 +56,9 @@ fn read_page<R: Read + Seek>(
reader.seek(SeekFrom::Start(start))?;

// read [header][data] to buffer
resize_buffer(buffer, length);
reader.read_exact(buffer)?;
buffer.clear();
buffer.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(buffer)?;

// deserialize [header]
let mut reader = Cursor::new(buffer);
Expand Down
24 changes: 9 additions & 15 deletions src/read/page/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,15 @@ pub(super) fn build_page<R: Read>(
.map(|x| x.num_values() as i64)
.unwrap_or_default();

let read_size = page_header.compressed_page_size as usize;
if read_size > 0 {
if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}
reader.reader.read_exact(buffer)?;
}
let read_size: usize = page_header.compressed_page_size.try_into()?;

buffer.clear();
buffer.try_reserve(read_size)?;
reader
.reader
.by_ref()
.take(read_size as u64)
.read_to_end(buffer)?;

let result = finish_page(
page_header,
Expand Down
13 changes: 7 additions & 6 deletions src/read/page/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn _get_page_stream<R: AsyncRead + Unpin + Send>(
let data_header = get_page_header(&page_header)?;
seen_values += data_header.as_ref().map(|x| x.num_values() as i64).unwrap_or_default();

let read_size = page_header.compressed_page_size as i64;
let read_size: usize = page_header.compressed_page_size.try_into()?;

if let Some(data_header) = data_header {
if !pages_filter(&descriptor, &data_header) {
Expand All @@ -89,11 +89,12 @@ fn _get_page_stream<R: AsyncRead + Unpin + Send>(
}

// followed by the buffer
let read_size = read_size as usize;
if read_size > 0 {
buffer.resize(read_size, 0);
reader.read_exact(&mut buffer).await?;
}
buffer.clear();
buffer.try_reserve(read_size)?;
reader
.take(read_size as u64)
.read_to_end(&mut buffer).await?;

let result = finish_page(
page_header,
&mut buffer,
Expand Down
18 changes: 14 additions & 4 deletions src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
reader
.seek(SeekFrom::End(-(default_end_len as i64)))
.await?;
let mut buffer = vec![0; default_end_len];
reader.read_exact(&mut buffer).await?;

let mut buffer = vec![];
buffer.try_reserve(default_end_len)?;
reader
.take(default_end_len as u64)
.read_to_end(&mut buffer)
.await?;

// check this is indeed a parquet file
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
Expand Down Expand Up @@ -77,8 +82,13 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer).await?;

buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader
.take(footer_len as u64)
.read_to_end(&mut buffer)
.await?;

Cursor::new(buffer)
};
Expand Down