Skip to content

Commit

Permalink
Fixed design in filtered pages
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 2, 2022
1 parent fabafed commit 2cea410
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 134 deletions.
101 changes: 39 additions & 62 deletions src/indexes/intervals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,38 @@ use crate::error::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Interval {
/// Its start
pub start: u64,
pub start: usize,
/// Its length
pub length: u64,
pub length: usize,
}

impl Interval {
/// Create a new interal
pub fn new(start: u64, length: u64) -> Self {
pub fn new(start: usize, length: usize) -> Self {
Self { start, length }
}
}

/// Returns the set of (row) intervals of the pages.
fn compute_page_row_intervals(
locations: &[PageLocation],
num_rows: u64,
num_rows: usize,
) -> Result<Vec<Interval>, Error> {
if locations.is_empty() {
return Ok(vec![]);
};

let last = (|| {
let first = locations.last().unwrap().first_row_index;
let start = u64::try_from(first)?;
let start: usize = locations.last().unwrap().first_row_index.try_into()?;
let length = num_rows - start;
Result::<_, Error>::Ok(Interval::new(start, length))
})();

let pages_lengths = locations
.windows(2)
.map(|x| {
let start = u64::try_from(x[0].first_row_index)?;
let length = u64::try_from(x[1].first_row_index - x[0].first_row_index)?;
let start = usize::try_from(x[0].first_row_index)?;
let length = usize::try_from(x[1].first_row_index - x[0].first_row_index)?;
Ok(Interval::new(start, length))
})
.chain(std::iter::once(last));
Expand All @@ -50,7 +49,7 @@ fn compute_page_row_intervals(
pub fn compute_rows(
selected: &[bool],
locations: &[PageLocation],
num_rows: u64,
num_rows: usize,
) -> Result<Vec<Interval>, Error> {
let page_intervals = compute_page_row_intervals(locations, num_rows)?;

Expand All @@ -70,74 +69,52 @@ pub fn compute_rows(
}

/// An enum describing a page that was either selected in a filter pushdown or skipped
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FilteredPage {
Select {
/// Location of the page in the file
start: u64,
length: usize,
/// Location of rows to select in the page
rows_offset: usize,
rows_length: usize,
},
Skip {
/// Location of the page in the file
start: u64,
length: usize,
/// number of rows that are skip by skipping this page
num_rows: usize,
},
}

impl FilteredPage {
pub fn start(&self) -> u64 {
match self {
Self::Select { start, .. } => *start,
Self::Skip { start, .. } => *start,
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FilteredPage {
/// Location of the page in the file
pub start: u64,
pub length: usize,
/// rows to select from the page
pub selected_rows: Vec<Interval>,
pub num_rows: usize,
}

fn is_in(probe: Interval, intervals: &[Interval]) -> Option<Interval> {
intervals.iter().find_map(|interval| {
let interval_end = interval.start + interval.length;
let probe_end = probe.start + probe.length;
let overlaps = (probe.start < interval_end) && (probe_end > interval.start);
if overlaps {
let start = interval.start.max(probe.start);
let end = interval_end.min(probe_end);
Some(Interval::new(start - probe.start, end - start))
} else {
None
}
})
fn is_in(probe: Interval, intervals: &[Interval]) -> Vec<Interval> {
intervals
.iter()
.filter_map(|interval| {
let interval_end = interval.start + interval.length;
let probe_end = probe.start + probe.length;
let overlaps = (probe.start < interval_end) && (probe_end > interval.start);
if overlaps {
let start = interval.start.max(probe.start);
let end = interval_end.min(probe_end);
Some(Interval::new(start - probe.start, end - start))
} else {
None
}
})
.collect()
}

/// Given a set of selected [Interval]s of rows and the set of page locations, returns the
pub fn select_pages(
intervals: &[Interval],
locations: &[PageLocation],
num_rows: u64,
num_rows: usize,
) -> Result<Vec<FilteredPage>, Error> {
let page_intervals = compute_page_row_intervals(locations, num_rows)?;

page_intervals
.into_iter()
.zip(locations.iter())
.map(|(interval, location)| {
Ok(if let Some(overlap) = is_in(interval, intervals) {
FilteredPage::Select {
start: location.offset.try_into()?,
length: location.compressed_page_size.try_into()?,
rows_offset: overlap.start.try_into()?,
rows_length: overlap.length.try_into()?,
}
} else {
FilteredPage::Skip {
start: location.offset.try_into()?,
length: location.compressed_page_size.try_into()?,
num_rows: interval.length.try_into()?,
}
let selected_rows = is_in(interval, intervals);
Ok(FilteredPage {
start: location.offset.try_into()?,
length: location.compressed_page_size.try_into()?,
selected_rows,
num_rows: interval.length,
})
})
.collect()
Expand Down
45 changes: 25 additions & 20 deletions src/indexes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,17 @@ mod tests {
assert_eq!(
pages,
vec![
FilteredPage::Skip {
FilteredPage {
start: 100,
length: 10,
selected_rows: vec![],
num_rows: 5
},
FilteredPage::Select {
FilteredPage {
start: 110,
length: 20,
rows_offset: 0,
rows_length: 5
selected_rows: vec![Interval::new(0, 5)],
num_rows: 5
}
]
);
Expand Down Expand Up @@ -114,15 +115,16 @@ mod tests {
assert_eq!(
pages,
vec![
FilteredPage::Select {
FilteredPage {
start: 100,
length: 20,
rows_offset: 5,
rows_length: 5
selected_rows: vec![Interval::new(5, 5)],
num_rows: 10,
},
FilteredPage::Skip {
FilteredPage {
start: 120,
length: 20,
selected_rows: vec![],
num_rows: 90
},
]
Expand Down Expand Up @@ -158,21 +160,22 @@ mod tests {
assert_eq!(
pages,
vec![
FilteredPage::Select {
FilteredPage {
start: 100,
length: 20,
rows_offset: 5,
rows_length: 5
selected_rows: vec![Interval::new(5, 5)],
num_rows: 10,
},
FilteredPage::Select {
FilteredPage {
start: 120,
length: 20,
rows_offset: 0,
rows_length: 1
selected_rows: vec![Interval::new(0, 1)],
num_rows: 90,
},
FilteredPage::Skip {
FilteredPage {
start: 140,
length: 20,
selected_rows: vec![],
num_rows: 100
},
]
Expand Down Expand Up @@ -208,20 +211,22 @@ mod tests {
assert_eq!(
pages,
vec![
FilteredPage::Select {
FilteredPage {
start: 100,
length: 20,
rows_offset: 0,
rows_length: 1
selected_rows: vec![Interval::new(0, 1)],
num_rows: 10,
},
FilteredPage::Skip {
FilteredPage {
start: 120,
length: 20,
selected_rows: vec![],
num_rows: 90
},
FilteredPage::Skip {
FilteredPage {
start: 140,
length: 20,
selected_rows: vec![],
num_rows: 100
},
]
Expand Down
62 changes: 50 additions & 12 deletions src/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use parquet_format_async_temp::{
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, PageHeader as ParquetPageHeader,
};

use crate::indexes::Interval;
pub use crate::parquet_bridge::{DataPageHeaderExt, PageType};

use crate::compression::Compression;
Expand All @@ -28,7 +29,7 @@ pub struct CompressedDataPage {
pub(crate) descriptor: Descriptor,

// The offset and length in rows
pub(crate) rows: Option<(usize, usize)>,
pub(crate) selected_rows: Option<Vec<Interval>>,
}

impl CompressedDataPage {
Expand All @@ -40,7 +41,28 @@ impl CompressedDataPage {
uncompressed_page_size: usize,
dictionary_page: Option<Arc<dyn DictPage>>,
descriptor: Descriptor,
rows: Option<(usize, usize)>,
rows: Option<usize>,
) -> Self {
Self::new_read(
header,
buffer,
compression,
uncompressed_page_size,
dictionary_page,
descriptor,
rows.map(|x| vec![Interval::new(0, x)]),
)
}

/// Returns a new [`CompressedDataPage`].
pub(crate) fn new_read(
header: DataPageHeader,
buffer: Vec<u8>,
compression: Compression,
uncompressed_page_size: usize,
dictionary_page: Option<Arc<dyn DictPage>>,
descriptor: Descriptor,
selected_rows: Option<Vec<Interval>>,
) -> Self {
Self {
header,
Expand All @@ -49,7 +71,7 @@ impl CompressedDataPage {
uncompressed_page_size,
dictionary_page,
descriptor,
rows,
selected_rows,
}
}

Expand All @@ -74,8 +96,8 @@ impl CompressedDataPage {

/// the rows to be selected by this page.
/// When `None`, all rows are to be considered.
pub fn rows(&self) -> Option<(usize, usize)> {
self.rows
pub fn selected_rows(&self) -> Option<&[Interval]> {
self.selected_rows.as_deref()
}

pub fn num_values(&self) -> usize {
Expand Down Expand Up @@ -120,7 +142,7 @@ pub struct DataPage {
pub(super) buffer: Vec<u8>,
pub(super) dictionary_page: Option<Arc<dyn DictPage>>,
pub descriptor: Descriptor,
pub rows: Option<(usize, usize)>,
pub selected_rows: Option<Vec<Interval>>,
}

impl DataPage {
Expand All @@ -129,14 +151,30 @@ impl DataPage {
buffer: Vec<u8>,
dictionary_page: Option<Arc<dyn DictPage>>,
descriptor: Descriptor,
rows: Option<(usize, usize)>,
rows: Option<usize>,
) -> Self {
Self::new_read(
header,
buffer,
dictionary_page,
descriptor,
rows.map(|x| vec![Interval::new(0, x)]),
)
}

pub(crate) fn new_read(
header: DataPageHeader,
buffer: Vec<u8>,
dictionary_page: Option<Arc<dyn DictPage>>,
descriptor: Descriptor,
selected_rows: Option<Vec<Interval>>,
) -> Self {
Self {
header,
buffer,
dictionary_page,
descriptor,
rows,
selected_rows,
}
}

Expand All @@ -154,8 +192,8 @@ impl DataPage {

/// the rows to be selected by this page.
/// When `None`, all rows are to be considered.
pub fn rows(&self) -> Option<(usize, usize)> {
self.rows
pub fn selected_rows(&self) -> Option<&[Interval]> {
self.selected_rows.as_deref()
}

/// Returns a mutable reference to the internal buffer.
Expand Down Expand Up @@ -246,9 +284,9 @@ impl CompressedPage {
}
}

pub(crate) fn rows(&self) -> Option<(usize, usize)> {
pub(crate) fn selected_rows(&self) -> Option<&[Interval]> {
match self {
CompressedPage::Data(page) => page.rows,
CompressedPage::Data(page) => page.selected_rows(),
CompressedPage::Dict(_) => None,
}
}
Expand Down
Loading

0 comments on commit 2cea410

Please sign in to comment.