Skip to content

Commit

Permalink
Improved performance of reading (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 2, 2022
1 parent 111f3e6 commit 4829365
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 54 deletions.
14 changes: 6 additions & 8 deletions src/bloom_filter/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ pub fn read<R: Read + Seek>(
bitset.clear();
return Ok(());
}
// read bitset
if header.num_bytes as usize > bitset.capacity() {
*bitset = vec![0; header.num_bytes as usize]
} else {
bitset.clear();
bitset.resize(header.num_bytes as usize, 0); // populate with zeros
}

reader.read_exact(bitset)?;
let length: usize = header.num_bytes.try_into()?;

bitset.clear();
bitset.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(bitset)?;

Ok(())
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ impl From<std::io::Error> for Error {
}
}

impl From<std::collections::TryReserveError> for Error {
fn from(e: std::collections::TryReserveError) -> Error {
Error::General(format!("OOM: {}", e))
}
}

impl From<std::num::TryFromIntError> for Error {
fn from(e: std::num::TryFromIntError) -> Error {
Error::OutOfSpec(format!("Number must be zero or positive: {}", e))
Expand Down
12 changes: 8 additions & 4 deletions src/read/indexes/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ pub fn read_columns_indexes<R: Read + Seek>(
let length = lengths.iter().sum::<usize>();

reader.seek(SeekFrom::Start(offset))?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

let mut data = vec![];
data.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(&mut data)?;

deserialize_column_indexes(chunks, &data, lengths)
}
Expand Down Expand Up @@ -122,8 +124,10 @@ pub fn read_pages_locations<R: Read + Seek>(
let length = lengths.iter().sum::<usize>();

reader.seek(SeekFrom::Start(offset))?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

let mut data = vec![];
data.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(&mut data)?;

deserialize_page_locations(&data, chunks.len())
}
17 changes: 13 additions & 4 deletions src/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
let mut buffer = vec![0; default_end_len];
reader.read_exact(&mut buffer)?;

let mut buffer = Vec::with_capacity(default_end_len);
reader
.by_ref()
.take(default_end_len as u64)
.read_to_end(&mut buffer)?;

// check this is indeed a parquet file
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
Expand Down Expand Up @@ -87,8 +91,13 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer)?;

buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader
.by_ref()
.take(footer_len as u64)
.read_to_end(&mut buffer)?;

Cursor::new(buffer)
};
Expand Down
17 changes: 4 additions & 13 deletions src/read/page/indexed_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,7 @@ pub struct IndexedPageReader<R: Read + Seek> {
pages: VecDeque<FilteredPage>,
}

fn resize_buffer(buffer: &mut Vec<u8>, length: usize) {
// prepare buffer
if length > buffer.len() {
// dealloc and ignore region, replacing it by a new region
*buffer = vec![0u8; length];
} else {
buffer.clear();
buffer.resize(length, 0);
}
}

#[allow(clippy::ptr_arg)] // false positive
fn read_page<R: Read + Seek>(
reader: &mut R,
start: u64,
Expand All @@ -66,8 +56,9 @@ fn read_page<R: Read + Seek>(
reader.seek(SeekFrom::Start(start))?;

// read [header][data] to buffer
resize_buffer(buffer, length);
reader.read_exact(buffer)?;
buffer.clear();
buffer.try_reserve(length)?;
reader.by_ref().take(length as u64).read_to_end(buffer)?;

// deserialize [header]
let mut reader = Cursor::new(buffer);
Expand Down
24 changes: 9 additions & 15 deletions src/read/page/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,15 @@ pub(super) fn build_page<R: Read>(
.map(|x| x.num_values() as i64)
.unwrap_or_default();

let read_size = page_header.compressed_page_size as usize;
if read_size > 0 {
if read_size > buffer.capacity() {
// dealloc and ignore region, replacing it by a new region.
// This won't reallocate - it frees and calls `alloc_zeroed`
*buffer = vec![0; read_size];
} else if read_size > buffer.len() {
// fill what we need with zeros so that we can use them in `Read`.
// This won't reallocate
buffer.resize(read_size, 0);
} else {
buffer.truncate(read_size);
}
reader.reader.read_exact(buffer)?;
}
let read_size: usize = page_header.compressed_page_size.try_into()?;

buffer.clear();
buffer.try_reserve(read_size)?;
reader
.reader
.by_ref()
.take(read_size as u64)
.read_to_end(buffer)?;

let result = finish_page(
page_header,
Expand Down
13 changes: 7 additions & 6 deletions src/read/page/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn _get_page_stream<R: AsyncRead + Unpin + Send>(
let data_header = get_page_header(&page_header)?;
seen_values += data_header.as_ref().map(|x| x.num_values() as i64).unwrap_or_default();

let read_size = page_header.compressed_page_size as i64;
let read_size: usize = page_header.compressed_page_size.try_into()?;

if let Some(data_header) = data_header {
if !pages_filter(&descriptor, &data_header) {
Expand All @@ -89,11 +89,12 @@ fn _get_page_stream<R: AsyncRead + Unpin + Send>(
}

// followed by the buffer
let read_size = read_size as usize;
if read_size > 0 {
buffer.resize(read_size, 0);
reader.read_exact(&mut buffer).await?;
}
buffer.clear();
buffer.try_reserve(read_size)?;
reader
.take(read_size as u64)
.read_to_end(&mut buffer).await?;

let result = finish_page(
page_header,
&mut buffer,
Expand Down
18 changes: 14 additions & 4 deletions src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
reader
.seek(SeekFrom::End(-(default_end_len as i64)))
.await?;
let mut buffer = vec![0; default_end_len];
reader.read_exact(&mut buffer).await?;

let mut buffer = vec![];
buffer.try_reserve(default_end_len)?;
reader
.take(default_end_len as u64)
.read_to_end(&mut buffer)
.await?;

// check this is indeed a parquet file
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
Expand Down Expand Up @@ -77,8 +82,13 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer).await?;

buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader
.take(footer_len as u64)
.read_to_end(&mut buffer)
.await?;

Cursor::new(buffer)
};
Expand Down

0 comments on commit 4829365

Please sign in to comment.