-
Notifications
You must be signed in to change notification settings - Fork 867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Parquet OffsetIndex to prune IO with RowSelection #2473
Conversation
@@ -65,7 +65,7 @@ pub fn read_pages_locations<R: ChunkReader>( | |||
let (offset, total_length) = get_location_offset_and_total_length(chunks)?; | |||
|
|||
//read all need data into buffer | |||
let mut reader = reader.get_read(offset, reader.len() as usize)?; | |||
let mut reader = reader.get_read(offset, total_length)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure this was a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I remember fixing it in a PR that got abandoned at some point
Thank you for this, I'll review first thing tomorrow. I like that you've found a way to allow sharing the page muxing logic in SerializedPageReader 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, will review again once rebased, but mostly minor nits
object_store/src/local.rs
Outdated
@@ -1068,6 +1068,7 @@ mod tests { | |||
integration.head(&path).await.unwrap(); | |||
} | |||
|
|||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this test fails on my machine because of a permissions issue. Meant to revert before submitting.
(mask, ranges) | ||
} | ||
|
||
pub fn selectors(&self) -> &[RowSelector] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't appear to be being used, and so I think can go. I've been trying to avoid exposing the internal layout of this type externally
let (mask, ranges) = selection.page_mask(&index); | ||
|
||
assert_eq!(mask, vec![false, true, true, false, true, true, false]); | ||
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we get a test where the final PageLocation is selected?
@@ -116,6 +118,62 @@ impl RowSelection { | |||
Self { selectors } | |||
} | |||
|
|||
/// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` | |||
pub fn page_mask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn page_mask( | |
pub(crate) fn page_mask( |
I don't think this likely to be useful outside the crate
pub fn page_mask( | ||
&self, | ||
page_locations: &[PageLocation], | ||
) -> (Vec<bool>, Vec<Range<usize>>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange to me that this method would return Vec<Range<usize>>
when it is called page_mask, and the caller clearly already has &[PageLocation]
that can easily be combined with the mask...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to just do it it one shot to avoid iterating over the locations again to get the ranges, but perhaps it's better to avoid overloading
Edit: Looking at this again, the mask
was part of a previous design that is no longer relevant, so I think we can just rename this and only return the ranges.
Sparse { | ||
/// Length of the full column chunk | ||
length: usize, | ||
data: Vec<(usize, Bytes)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment explaining what these are would go a long way
parquet/src/arrow/async_reader.rs
Outdated
.find(|(offset, bytes)| { | ||
*offset <= start as usize && (start as usize - *offset) < bytes.len() | ||
}) | ||
.map(|(_, bytes)| bytes.slice(0..length).reader()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line above allows offset to be greater than start, but this won't return the correct slice in such a case?
parquet/src/arrow/async_reader.rs
Outdated
ColumnChunkData::Sparse { data, .. } => data | ||
.iter() | ||
.find(|(offset, bytes)| { | ||
*offset <= start as usize && (start as usize - *offset) < bytes.len() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should do an exact match? I think this should work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure whether there is ever a case in which we fetch some subset of the page. Thinking about it more I don't believe that would ever be a valid use case.
self.offset += page_header.compressed_page_size as usize; | ||
fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { | ||
match &self { | ||
ColumnChunkData::Sparse { data, .. } => data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As data is sorted, you could consider https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search or friends
let page_header = read_page_header(&mut cursor)?; | ||
self.offset += cursor.position() as usize; | ||
self.offset += page_header.compressed_page_size as usize; | ||
fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is worth noting this will currently represent a performance regression, as avoided a copy - https://github.com/apache/arrow-rs/pull/2473/files#diff-f6b1a106d47a16504d4a16d57a6632872ddf596f337ac0640a13523dccc2d4d4L615
I will add a get_bytes method to ChunkReader to avoid this
90d57fa
to
b28ea09
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good to go, thank you, there are some minor nits still, but I'm happy for these to be addressed in a follow up. Once merged I will rebase #2478 onto this
Docs appear to be failing |
Benchmark runs are scheduled for baseline = 42e9531 and contender = 2185ce2. 2185ce2 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #2426.
Rationale for this change
When we have a
RowSelection
and anOffsetIndex
we can reduce IO by fetching only the pages selected.This also builds on #2464 to remove
InMemoryColumnChunk
and unify everything to useSerializedPageReader
What changes are included in this PR?
We can represent pre-fetched column chunks as either a "dense" encoding (just
Bytes
) or a "sparse" encoding which contains only the pages relevant to a givenRowSelection
.Also remove
InMemoryColumnChunk
to help unify the sync and async parquet paths.Are there any user-facing changes?