diff --git a/src/deserialize/hybrid_rle.rs b/src/deserialize/hybrid_rle.rs index fd447897e..deadd8d5a 100644 --- a/src/deserialize/hybrid_rle.rs +++ b/src/deserialize/hybrid_rle.rs @@ -1,6 +1,7 @@ -use crate::encoding::hybrid_rle; +use crate::encoding::hybrid_rle::{self, BitmapIter}; -#[derive(Debug, PartialEq, Eq)] +/// The decoding state of the hybrid-RLE decoder with a maximum definition level of 1 +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum HybridEncoded<'a> { /// a bitmap Bitmap(&'a [u8], usize, usize), @@ -9,10 +10,15 @@ pub enum HybridEncoded<'a> { Repeated(bool, usize), } -/// An iterator of [`HybridEncoded`], adapter over [`hybrid_rle::HybridEncoded`], -/// specialized to be consumed into bitmaps. +pub trait HybridRleRunsIterator<'a>: Iterator> { + /// Number of elements remaining. This may not be the items of the iterator - an item + /// of the iterator may contain more than one element. + fn number_of_elements(&self) -> usize; +} + +/// An iterator of [`HybridEncoded`], adapter over [`hybrid_rle::HybridEncoded`]. #[derive(Debug, Clone)] -pub struct HybridBitmapIter<'a, I: Iterator>> { +pub struct HybridRleIter<'a, I: Iterator>> { iter: I, current: Option>, // invariants: @@ -27,8 +33,8 @@ pub struct HybridBitmapIter<'a, I: Iterator length: usize, } -impl<'a, I: Iterator>> HybridBitmapIter<'a, I> { - /// Returns a new [`HybridBitmapIter`] +impl<'a, I: Iterator>> HybridRleIter<'a, I> { + /// Returns a new [`HybridRleIter`] #[inline] pub fn new(mut iter: I, length: usize) -> Self { let current = iter.next(); @@ -41,7 +47,7 @@ impl<'a, I: Iterator>> HybridBitmapIter<'a, } } - /// the number of elements in the iterator + /// the number of elements in the iterator. Note that this _is not_ the number of runs. #[inline] pub fn len(&self) -> usize { self.length - self.consumed @@ -53,7 +59,7 @@ impl<'a, I: Iterator>> HybridBitmapIter<'a, } /// fetches the next bitmap, optionally limited. - /// When limited, a run of the hybrid may return an offsetted bitmap + /// When limited, a run may return an offsetted bitmap pub fn limited_next(&mut self, limit: Option) -> Option> { if self.consumed == self.length { return None; @@ -118,10 +124,94 @@ impl<'a, I: Iterator>> HybridBitmapIter<'a, } } -impl<'a, I: Iterator>> Iterator for HybridBitmapIter<'a, I> { +impl<'a, I: Iterator>> HybridRleRunsIterator<'a> + for HybridRleIter<'a, I> +{ + fn number_of_elements(&self) -> usize { + self.len() + } +} + +impl<'a, I: Iterator>> Iterator for HybridRleIter<'a, I> { type Item = HybridEncoded<'a>; + #[inline] fn next(&mut self) -> Option { self.limited_next(None) } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } } + +/// Type definition for a [`HybridRleIter`] using [`hybrid_rle::Decoder`]. +pub type HybridDecoderBitmapIter<'a> = HybridRleIter<'a, hybrid_rle::Decoder<'a>>; + +#[derive(Debug)] +enum HybridBooleanState<'a> { + /// a bitmap + Bitmap(BitmapIter<'a>), + /// A repeated item. The first attribute corresponds to whether the value is set + /// the second attribute corresponds to the number of repetitions. + Repeated(bool, usize), +} + +/// An iterator adapter that maps an iterator of [`HybridEncoded`] into an iterator +/// over [`bool`]. +#[derive(Debug)] +pub struct HybridRleBooleanIter<'a, I: Iterator>> { + iter: I, + current_run: Option>, +} + +impl<'a, I: Iterator>> HybridRleBooleanIter<'a, I> { + pub fn new(iter: I) -> Self { + Self { + iter, + current_run: None, + } + } +} + +impl<'a, I: HybridRleRunsIterator<'a>> Iterator for HybridRleBooleanIter<'a, I> { + type Item = bool; + + #[inline] + fn next(&mut self) -> Option { + if let Some(run) = &mut self.current_run { + match run { + HybridBooleanState::Bitmap(bitmap) => bitmap.next(), + HybridBooleanState::Repeated(value, remaining) => { + if *remaining == 0 { + None + } else { + *remaining -= 1; + Some(*value) + } + } + } + } else if let Some(run) = self.iter.next() { + self.current_run = Some(match run { + HybridEncoded::Bitmap(bitmap, offset, length) => { + HybridBooleanState::Bitmap(BitmapIter::new(bitmap, offset, length)) + } + HybridEncoded::Repeated(value, length) => { + HybridBooleanState::Repeated(value, length) + } + }); + self.next() + } else { + None + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let exact = self.iter.number_of_elements(); + (exact, Some(exact)) + } +} + +/// Type definition for a [`HybridRleBooleanIter`] using [`hybrid_rle::Decoder`]. +pub type HybridRleDecoderIter<'a> = HybridRleBooleanIter<'a, HybridDecoderBitmapIter<'a>>; diff --git a/src/deserialize/mod.rs b/src/deserialize/mod.rs index 36866d84c..f17bdd3f8 100644 --- a/src/deserialize/mod.rs +++ b/src/deserialize/mod.rs @@ -10,4 +10,4 @@ pub use boolean::*; pub use fixed_len::*; pub use hybrid_rle::*; pub use native::*; -pub use utils::{DefLevelsDecoder, HybridDecoderBitmapIter}; +pub use utils::{DefLevelsDecoder, OptionalValues, SliceFilteredIter}; diff --git a/src/deserialize/utils.rs b/src/deserialize/utils.rs index 51e4b6b8f..c479d258d 100644 --- a/src/deserialize/utils.rs +++ b/src/deserialize/utils.rs @@ -1,10 +1,13 @@ +use std::collections::VecDeque; + use crate::{ encoding::hybrid_rle::{self, HybridRleDecoder}, + indexes::Interval, page::{split_buffer, DataPage}, read::levels::get_bit_width, }; -use super::hybrid_rle::HybridBitmapIter; +use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter}; pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDecoder { let (_, _, indices_buffer) = split_buffer(page); @@ -17,9 +20,6 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDeco hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, page.num_values()) } -/// Type definition for a [`HybridBitmapIter`] -pub type HybridDecoderBitmapIter<'a> = HybridBitmapIter<'a, hybrid_rle::Decoder<'a>>; - /// Decoder of definition levels. #[derive(Debug)] pub enum DefLevelsDecoder<'a> { @@ -27,7 +27,7 @@ pub enum DefLevelsDecoder<'a> { /// the bitpacked runs are bitmaps. This variant contains [`HybridDecoderBitmapIter`] /// that decodes the runs, but not the individual values Bitmap(HybridDecoderBitmapIter<'a>), - /// When the maximum definition level is 1, + /// When the maximum definition level is larger than 1 Levels(HybridRleDecoder<'a>, u32), } @@ -38,7 +38,7 @@ impl<'a> DefLevelsDecoder<'a> { let max_def_level = page.descriptor.max_def_level; if max_def_level == 1 { let iter = hybrid_rle::Decoder::new(def_levels, 1); - let iter = HybridBitmapIter::new(iter, page.num_values()); + let iter = HybridRleIter::new(iter, page.num_values()); Self::Bitmap(iter) } else { let iter = @@ -47,3 +47,109 @@ impl<'a> DefLevelsDecoder<'a> { } } } + +/// Iterator adapter to convert an iterator of non-null values and an iterator over validity +/// into an iterator of optional values. +#[derive(Debug, Clone)] +pub struct OptionalValues, I: Iterator> { + validity: V, + values: I, +} + +impl, I: Iterator> OptionalValues { + pub fn new(validity: V, values: I) -> Self { + Self { validity, values } + } +} + +impl, I: Iterator> Iterator for OptionalValues { + type Item = Option; + + #[inline] + fn next(&mut self) -> Option { + self.validity + .next() + .map(|x| if x { self.values.next() } else { None }) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.validity.size_hint() + } +} + +/// An iterator adapter that converts an iterator over items into an iterator over slices of +/// those N items. +/// +/// This iterator is best used with iterators that implement `nth` since skipping items +/// allows this iterator to skip sequences of items without having to call each of them. +#[derive(Debug, Clone)] +pub struct SliceFilteredIter { + iter: I, + selected_rows: VecDeque, + current_remaining: usize, + current: usize, // position in the slice +} + +impl SliceFilteredIter { + /// Return a new [`SliceFilteredIter`] + pub fn new(iter: I, selected_rows: VecDeque) -> Self { + Self { + iter, + selected_rows, + current_remaining: 0, + current: 0, + } + } +} + +impl> Iterator for SliceFilteredIter { + type Item = T; + + #[inline] + fn next(&mut self) -> Option { + if self.current_remaining == 0 { + if let Some(interval) = self.selected_rows.pop_front() { + // skip the hole between the previous start end this start + // (start + length) - start + let item = self.iter.nth(interval.start - self.current); + self.current = interval.start + interval.length; + self.current_remaining = interval.length - 1; + item + } else { + None + } + } else { + self.current_remaining -= 1; + self.iter.next() + } + } +} + +#[cfg(test)] +mod test { + use std::collections::VecDeque; + + use super::*; + + #[test] + fn basic() { + let iter = 0..=100; + + let intervals = vec![ + Interval::new(0, 2), + Interval::new(20, 11), + Interval::new(31, 1), + ]; + + let a: VecDeque = intervals.clone().into_iter().collect(); + let a = SliceFilteredIter::new(iter, a); + + let expected: Vec = intervals + .into_iter() + .flat_map(|interval| interval.start..(interval.start + interval.length)) + .collect(); + + assert_eq!(expected, a.collect::>()); + } +} diff --git a/src/indexes/intervals.rs b/src/indexes/intervals.rs index 6bff226c3..9af3da75e 100644 --- a/src/indexes/intervals.rs +++ b/src/indexes/intervals.rs @@ -6,14 +6,14 @@ use crate::error::Error; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Interval { /// Its start - pub start: u64, + pub start: usize, /// Its length - pub length: u64, + pub length: usize, } impl Interval { /// Create a new interal - pub fn new(start: u64, length: u64) -> Self { + pub fn new(start: usize, length: usize) -> Self { Self { start, length } } } @@ -21,15 +21,14 @@ impl Interval { /// Returns the set of (row) intervals of the pages. fn compute_page_row_intervals( locations: &[PageLocation], - num_rows: u64, + num_rows: usize, ) -> Result, Error> { if locations.is_empty() { return Ok(vec![]); }; let last = (|| { - let first = locations.last().unwrap().first_row_index; - let start = u64::try_from(first)?; + let start: usize = locations.last().unwrap().first_row_index.try_into()?; let length = num_rows - start; Result::<_, Error>::Ok(Interval::new(start, length)) })(); @@ -37,8 +36,8 @@ fn compute_page_row_intervals( let pages_lengths = locations .windows(2) .map(|x| { - let start = u64::try_from(x[0].first_row_index)?; - let length = u64::try_from(x[1].first_row_index - x[0].first_row_index)?; + let start = usize::try_from(x[0].first_row_index)?; + let length = usize::try_from(x[1].first_row_index - x[0].first_row_index)?; Ok(Interval::new(start, length)) }) .chain(std::iter::once(last)); @@ -50,7 +49,7 @@ fn compute_page_row_intervals( pub fn compute_rows( selected: &[bool], locations: &[PageLocation], - num_rows: u64, + num_rows: usize, ) -> Result, Error> { let page_intervals = compute_page_row_intervals(locations, num_rows)?; @@ -70,54 +69,40 @@ pub fn compute_rows( } /// An enum describing a page that was either selected in a filter pushdown or skipped -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum FilteredPage { - Select { - /// Location of the page in the file - start: u64, - length: usize, - /// Location of rows to select in the page - rows_offset: usize, - rows_length: usize, - }, - Skip { - /// Location of the page in the file - start: u64, - length: usize, - /// number of rows that are skip by skipping this page - num_rows: usize, - }, -} - -impl FilteredPage { - pub fn start(&self) -> u64 { - match self { - Self::Select { start, .. } => *start, - Self::Skip { start, .. } => *start, - } - } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FilteredPage { + /// Location of the page in the file + pub start: u64, + pub length: usize, + /// rows to select from the page + pub selected_rows: Vec, + pub num_rows: usize, } -fn is_in(probe: Interval, intervals: &[Interval]) -> Option { - intervals.iter().find_map(|interval| { - let interval_end = interval.start + interval.length; - let probe_end = probe.start + probe.length; - let overlaps = (probe.start < interval_end) && (probe_end > interval.start); - if overlaps { - let start = interval.start.max(probe.start); - let end = interval_end.min(probe_end); - Some(Interval::new(start - probe.start, end - start)) - } else { - None - } - }) +fn is_in(probe: Interval, intervals: &[Interval]) -> Vec { + intervals + .iter() + .filter_map(|interval| { + let interval_end = interval.start + interval.length; + let probe_end = probe.start + probe.length; + let overlaps = (probe.start < interval_end) && (probe_end > interval.start); + if overlaps { + let start = interval.start.max(probe.start); + let end = interval_end.min(probe_end); + Some(Interval::new(start - probe.start, end - start)) + } else { + None + } + }) + .collect() } -/// Given a set of selected [Interval]s of rows and the set of page locations, returns the +/// Given a set of selected [Interval]s of rows and the set of [`PageLocation`], returns the +/// a set of [`FilteredPage`] with the same number of items as `locations`. pub fn select_pages( intervals: &[Interval], locations: &[PageLocation], - num_rows: u64, + num_rows: usize, ) -> Result, Error> { let page_intervals = compute_page_row_intervals(locations, num_rows)?; @@ -125,19 +110,12 @@ pub fn select_pages( .into_iter() .zip(locations.iter()) .map(|(interval, location)| { - Ok(if let Some(overlap) = is_in(interval, intervals) { - FilteredPage::Select { - start: location.offset.try_into()?, - length: location.compressed_page_size.try_into()?, - rows_offset: overlap.start.try_into()?, - rows_length: overlap.length.try_into()?, - } - } else { - FilteredPage::Skip { - start: location.offset.try_into()?, - length: location.compressed_page_size.try_into()?, - num_rows: interval.length.try_into()?, - } + let selected_rows = is_in(interval, intervals); + Ok(FilteredPage { + start: location.offset.try_into()?, + length: location.compressed_page_size.try_into()?, + selected_rows, + num_rows: interval.length, }) }) .collect() diff --git a/src/indexes/mod.rs b/src/indexes/mod.rs index e0b80dcdf..f1a9361ed 100644 --- a/src/indexes/mod.rs +++ b/src/indexes/mod.rs @@ -76,16 +76,17 @@ mod tests { assert_eq!( pages, vec![ - FilteredPage::Skip { + FilteredPage { start: 100, length: 10, + selected_rows: vec![], num_rows: 5 }, - FilteredPage::Select { + FilteredPage { start: 110, length: 20, - rows_offset: 0, - rows_length: 5 + selected_rows: vec![Interval::new(0, 5)], + num_rows: 5 } ] ); @@ -114,15 +115,16 @@ mod tests { assert_eq!( pages, vec![ - FilteredPage::Select { + FilteredPage { start: 100, length: 20, - rows_offset: 5, - rows_length: 5 + selected_rows: vec![Interval::new(5, 5)], + num_rows: 10, }, - FilteredPage::Skip { + FilteredPage { start: 120, length: 20, + selected_rows: vec![], num_rows: 90 }, ] @@ -158,21 +160,22 @@ mod tests { assert_eq!( pages, vec![ - FilteredPage::Select { + FilteredPage { start: 100, length: 20, - rows_offset: 5, - rows_length: 5 + selected_rows: vec![Interval::new(5, 5)], + num_rows: 10, }, - FilteredPage::Select { + FilteredPage { start: 120, length: 20, - rows_offset: 0, - rows_length: 1 + selected_rows: vec![Interval::new(0, 1)], + num_rows: 90, }, - FilteredPage::Skip { + FilteredPage { start: 140, length: 20, + selected_rows: vec![], num_rows: 100 }, ] @@ -208,20 +211,22 @@ mod tests { assert_eq!( pages, vec![ - FilteredPage::Select { + FilteredPage { start: 100, length: 20, - rows_offset: 0, - rows_length: 1 + selected_rows: vec![Interval::new(0, 1)], + num_rows: 10, }, - FilteredPage::Skip { + FilteredPage { start: 120, length: 20, + selected_rows: vec![], num_rows: 90 }, - FilteredPage::Skip { + FilteredPage { start: 140, length: 20, + selected_rows: vec![], num_rows: 100 }, ] diff --git a/src/page/mod.rs b/src/page/mod.rs index 4333ee45c..6cbb57608 100644 --- a/src/page/mod.rs +++ b/src/page/mod.rs @@ -7,6 +7,7 @@ pub use parquet_format_async_temp::{ DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, PageHeader as ParquetPageHeader, }; +use crate::indexes::Interval; pub use crate::parquet_bridge::{DataPageHeaderExt, PageType}; use crate::compression::Compression; @@ -28,7 +29,7 @@ pub struct CompressedDataPage { pub(crate) descriptor: Descriptor, // The offset and length in rows - pub(crate) rows: Option<(usize, usize)>, + pub(crate) selected_rows: Option>, } impl CompressedDataPage { @@ -40,7 +41,28 @@ impl CompressedDataPage { uncompressed_page_size: usize, dictionary_page: Option>, descriptor: Descriptor, - rows: Option<(usize, usize)>, + rows: Option, + ) -> Self { + Self::new_read( + header, + buffer, + compression, + uncompressed_page_size, + dictionary_page, + descriptor, + rows.map(|x| vec![Interval::new(0, x)]), + ) + } + + /// Returns a new [`CompressedDataPage`]. + pub(crate) fn new_read( + header: DataPageHeader, + buffer: Vec, + compression: Compression, + uncompressed_page_size: usize, + dictionary_page: Option>, + descriptor: Descriptor, + selected_rows: Option>, ) -> Self { Self { header, @@ -49,7 +71,7 @@ impl CompressedDataPage { uncompressed_page_size, dictionary_page, descriptor, - rows, + selected_rows, } } @@ -74,8 +96,8 @@ impl CompressedDataPage { /// the rows to be selected by this page. /// When `None`, all rows are to be considered. - pub fn rows(&self) -> Option<(usize, usize)> { - self.rows + pub fn selected_rows(&self) -> Option<&[Interval]> { + self.selected_rows.as_deref() } pub fn num_values(&self) -> usize { @@ -120,7 +142,7 @@ pub struct DataPage { pub(super) buffer: Vec, pub(super) dictionary_page: Option>, pub descriptor: Descriptor, - pub rows: Option<(usize, usize)>, + pub selected_rows: Option>, } impl DataPage { @@ -129,14 +151,30 @@ impl DataPage { buffer: Vec, dictionary_page: Option>, descriptor: Descriptor, - rows: Option<(usize, usize)>, + rows: Option, + ) -> Self { + Self::new_read( + header, + buffer, + dictionary_page, + descriptor, + rows.map(|x| vec![Interval::new(0, x)]), + ) + } + + pub(crate) fn new_read( + header: DataPageHeader, + buffer: Vec, + dictionary_page: Option>, + descriptor: Descriptor, + selected_rows: Option>, ) -> Self { Self { header, buffer, dictionary_page, descriptor, - rows, + selected_rows, } } @@ -154,8 +192,8 @@ impl DataPage { /// the rows to be selected by this page. /// When `None`, all rows are to be considered. - pub fn rows(&self) -> Option<(usize, usize)> { - self.rows + pub fn selected_rows(&self) -> Option<&[Interval]> { + self.selected_rows.as_deref() } /// Returns a mutable reference to the internal buffer. @@ -246,9 +284,9 @@ impl CompressedPage { } } - pub(crate) fn rows(&self) -> Option<(usize, usize)> { + pub(crate) fn selected_rows(&self) -> Option<&[Interval]> { match self { - CompressedPage::Data(page) => page.rows, + CompressedPage::Data(page) => page.selected_rows(), CompressedPage::Dict(_) => None, } } diff --git a/src/read/compression.rs b/src/read/compression.rs index 22e19de03..002a6fa4a 100644 --- a/src/read/compression.rs +++ b/src/read/compression.rs @@ -85,12 +85,12 @@ pub fn decompress( buffer: &mut Vec, ) -> Result { decompress_buffer(&mut compressed_page, buffer)?; - Ok(DataPage::new( + Ok(DataPage::new_read( compressed_page.header, std::mem::take(buffer), compressed_page.dictionary_page, compressed_page.descriptor, - compressed_page.rows, + compressed_page.selected_rows, )) } @@ -101,12 +101,12 @@ fn decompress_reuse( ) -> Result<(DataPage, bool)> { let was_decompressed = decompress_buffer(&mut compressed_page, buffer)?; - let new_page = DataPage::new( + let new_page = DataPage::new_read( compressed_page.header, std::mem::take(buffer), compressed_page.dictionary_page, compressed_page.descriptor, - compressed_page.rows, + compressed_page.selected_rows, ); if was_decompressed { diff --git a/src/read/page/indexed_reader.rs b/src/read/page/indexed_reader.rs index c799e5d15..c4d1d97ab 100644 --- a/src/read/page/indexed_reader.rs +++ b/src/read/page/indexed_reader.rs @@ -6,7 +6,7 @@ use std::{ use crate::{ error::Error, - indexes::FilteredPage, + indexes::{FilteredPage, Interval}, metadata::{ColumnChunkMetaData, Descriptor}, page::{CompressedDataPage, DictPage, ParquetPageHeader}, parquet_bridge::Compression, @@ -115,7 +115,7 @@ impl IndexedPageReader { // the column let dictionary = match pages.get(0) { Some(page) => { - let length = (page.start() - column_start) as usize; + let length = (page.start - column_start) as usize; if length > 0 { Some(LazyDict::Range(column_start, length)) } else { @@ -146,7 +146,7 @@ impl IndexedPageReader { &mut self, start: u64, length: usize, - rows: (usize, usize), + selected_rows: Vec, ) -> Result { // it will be read - take buffer let mut data = std::mem::take(&mut self.data_buffer); @@ -187,7 +187,7 @@ impl IndexedPageReader { self.compression, &dict, &self.descriptor, - Some(rows), + Some(selected_rows), ) } } @@ -197,25 +197,19 @@ impl Iterator for IndexedPageReader { fn next(&mut self) -> Option { if let Some(page) = self.pages.pop_front() { - match page { - FilteredPage::Select { - start, - length, - rows_offset, - rows_length, - } => { - let page = match self.read_page(start, length, (rows_offset, rows_length)) { - Err(e) => return Some(Err(e)), - Ok(header) => header, - }; - match page { - FinishedPage::Data(page) => Some(Ok(page)), - FinishedPage::Dict(_) => Some(Err(Error::OutOfSpec( - "Dictionary pages cannot be selected via indexes".to_string(), - ))), - } + if page.selected_rows.is_empty() { + self.next() + } else { + let page = match self.read_page(page.start, page.length, page.selected_rows) { + Err(e) => return Some(Err(e)), + Ok(header) => header, + }; + match page { + FinishedPage::Data(page) => Some(Ok(page)), + FinishedPage::Dict(_) => Some(Err(Error::OutOfSpec( + "Dictionary pages cannot be selected via indexes".to_string(), + ))), } - FilteredPage::Skip { .. } => self.next(), } } else { None diff --git a/src/read/page/reader.rs b/src/read/page/reader.rs index 2308d40b4..3c9fdc1bf 100644 --- a/src/read/page/reader.rs +++ b/src/read/page/reader.rs @@ -5,6 +5,7 @@ use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol; use crate::compression::Compression; use crate::error::Result; +use crate::indexes::Interval; use crate::metadata::{ColumnChunkMetaData, Descriptor}; use crate::page::{ @@ -181,7 +182,7 @@ pub(super) fn finish_page( compression: Compression, current_dictionary: &Option>, descriptor: &Descriptor, - rows: Option<(usize, usize)>, + selected_rows: Option>, ) -> Result { let type_ = page_header.type_.try_into()?; match type_ { @@ -207,27 +208,27 @@ pub(super) fn finish_page( PageType::DataPage => { let header = page_header.data_page_header.unwrap(); - Ok(FinishedPage::Data(CompressedDataPage::new( + Ok(FinishedPage::Data(CompressedDataPage::new_read( DataPageHeader::V1(header), std::mem::take(data), compression, page_header.uncompressed_page_size as usize, current_dictionary.clone(), descriptor.clone(), - rows, + selected_rows, ))) } PageType::DataPageV2 => { let header = page_header.data_page_header_v2.unwrap(); - Ok(FinishedPage::Data(CompressedDataPage::new( + Ok(FinishedPage::Data(CompressedDataPage::new_read( DataPageHeader::V2(header), std::mem::take(data), compression, page_header.uncompressed_page_size as usize, current_dictionary.clone(), descriptor.clone(), - rows, + selected_rows, ))) } } diff --git a/src/write/compression.rs b/src/write/compression.rs index d6c44a727..9821fce46 100644 --- a/src/write/compression.rs +++ b/src/write/compression.rs @@ -18,7 +18,7 @@ fn compress_data( header, dictionary_page, descriptor, - rows, + selected_rows, } = page; let uncompressed_page_size = buffer.len(); if compression != Compression::Uncompressed { @@ -41,14 +41,14 @@ fn compress_data( } else { std::mem::swap(&mut buffer, &mut compressed_buffer); }; - Ok(CompressedDataPage::new( + Ok(CompressedDataPage::new_read( header, compressed_buffer, compression, uncompressed_page_size, dictionary_page, descriptor, - rows, + selected_rows, )) } diff --git a/src/write/page.rs b/src/write/page.rs index 71c025af0..fbc38d585 100644 --- a/src/write/page.rs +++ b/src/write/page.rs @@ -53,7 +53,7 @@ pub fn write_page( compressed_page: &CompressedPage, ) -> Result { let num_values = compressed_page.num_values(); - let rows = compressed_page.rows(); + let selected_rows = compressed_page.selected_rows(); let header = match &compressed_page { CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page), @@ -85,7 +85,7 @@ pub fn write_page( offset, bytes_written, statistics, - num_rows: rows.map(|x| x.1), + num_rows: selected_rows.map(|x| x.last().unwrap().length), num_values, }) } @@ -96,7 +96,7 @@ pub async fn write_page_async( compressed_page: &CompressedPage, ) -> Result { let num_values = compressed_page.num_values(); - let rows = compressed_page.rows(); + let selected_rows = compressed_page.selected_rows(); let header = match &compressed_page { CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page), @@ -128,7 +128,7 @@ pub async fn write_page_async( offset, bytes_written, statistics, - num_rows: rows.map(|x| x.1), + num_rows: selected_rows.map(|x| x.last().unwrap().length), num_values, }) } diff --git a/tests/it/read/primitive.rs b/tests/it/read/primitive.rs index 22f56b078..d1de8692a 100644 --- a/tests/it/read/primitive.rs +++ b/tests/it/read/primitive.rs @@ -1,23 +1,107 @@ -use parquet2::{deserialize::NativePageState, error::Error, page::DataPage, types::NativeType}; +use parquet2::{ + deserialize::{ + native_cast, Casted, HybridRleDecoderIter, HybridRleIter, NativePageState, OptionalValues, + SliceFilteredIter, + }, + encoding::{hybrid_rle::Decoder, Encoding}, + error::Error, + page::{split_buffer, DataPage}, + schema::Repetition, + types::NativeType, +}; use super::utils::deserialize_optional; +/// The deserialization state of a `DataPage` of `Primitive` parquet primitive type +#[derive(Debug)] +pub enum FilteredPageState<'a, T> +where + T: NativeType, +{ + /// A page of optional values + Optional(SliceFilteredIter, Casted<'a, T>>>), + /// A page of required values + Required(SliceFilteredIter>), +} + +/// The deserialization state of a `DataPage` of `Primitive` parquet primitive type +#[derive(Debug)] +pub enum PageState<'a, T> +where + T: NativeType, +{ + Nominal(NativePageState<'a, T>), + Filtered(FilteredPageState<'a, T>), +} + +impl<'a, T: NativeType> PageState<'a, T> { + /// Tries to create [`NativePageState`] + /// # Error + /// Errors iff the page is not a `NativePageState` + pub fn try_new(page: &'a DataPage) -> Result { + if let Some(selected_rows) = page.selected_rows() { + let is_optional = + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + + match (page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::Plain, _, true) => { + let (_, def_levels, _) = split_buffer(page); + + let validity = HybridRleDecoderIter::new(HybridRleIter::new( + Decoder::new(def_levels, 1), + page.num_values(), + )); + let values = native_cast(page)?; + + // validity and values interleaved. + let values = OptionalValues::new(validity, values); + + let values = + SliceFilteredIter::new(values, selected_rows.iter().copied().collect()); + + Ok(Self::Filtered(FilteredPageState::Optional(values))) + } + (Encoding::Plain, _, false) => { + let values = SliceFilteredIter::new( + native_cast(page)?, + selected_rows.iter().copied().collect(), + ); + Ok(Self::Filtered(FilteredPageState::Required(values))) + } + _ => Err(Error::General(format!( + "Viewing page for encoding {:?} for native type {} not supported", + page.encoding(), + std::any::type_name::() + ))), + } + } else { + NativePageState::try_new(page).map(Self::Nominal) + } + } +} + pub fn page_to_vec(page: &DataPage) -> Result>, Error> { assert_eq!(page.descriptor.max_rep_level, 0); - let state = NativePageState::::try_new(page)?; + let state = PageState::::try_new(page)?; match state { - NativePageState::Optional(validity, values) => deserialize_optional(validity, values), - NativePageState::Required(values) => Ok(values.map(Some).collect()), - NativePageState::RequiredDictionary(dict) => Ok(dict - .indexes - .map(|x| x as usize) - .map(|x| dict.values[x]) - .map(Some) - .collect()), - NativePageState::OptionalDictionary(validity, dict) => { - let values = dict.indexes.map(|x| x as usize).map(|x| dict.values[x]); - deserialize_optional(validity, values) - } + PageState::Nominal(state) => match state { + NativePageState::Optional(validity, values) => deserialize_optional(validity, values), + NativePageState::Required(values) => Ok(values.map(Some).collect()), + NativePageState::RequiredDictionary(dict) => Ok(dict + .indexes + .map(|x| x as usize) + .map(|x| dict.values[x]) + .map(Some) + .collect()), + NativePageState::OptionalDictionary(validity, dict) => { + let values = dict.indexes.map(|x| x as usize).map(|x| dict.values[x]); + deserialize_optional(validity, values) + } + }, + PageState::Filtered(state) => match state { + FilteredPageState::Optional(values) => Ok(values.collect()), + FilteredPageState::Required(values) => Ok(values.map(Some).collect()), + }, } } diff --git a/tests/it/write/binary.rs b/tests/it/write/binary.rs index 40f733006..4567aa721 100644 --- a/tests/it/write/binary.rs +++ b/tests/it/write/binary.rs @@ -84,6 +84,6 @@ pub fn array_to_page_v1( buffer, None, descriptor.clone(), - Some((0, array.len())), + Some(array.len()), ))) } diff --git a/tests/it/write/indexes.rs b/tests/it/write/indexes.rs new file mode 100644 index 000000000..b423951f0 --- /dev/null +++ b/tests/it/write/indexes.rs @@ -0,0 +1,139 @@ +use std::io::Cursor; + +use parquet2::compression::Compression; +use parquet2::error::Result; +use parquet2::indexes::{ + select_pages, BoundaryOrder, Index, Interval, NativeIndex, PageIndex, PageLocation, +}; +use parquet2::metadata::SchemaDescriptor; +use parquet2::read::{ + read_columns_indexes, read_metadata, read_pages_locations, BasicDecompressor, IndexedPageReader, +}; +use parquet2::schema::types::{ParquetType, PhysicalType, PrimitiveType}; +use parquet2::write::WriteOptions; +use parquet2::write::{Compressor, DynIter, DynStreamingIterator, FileWriter, Version}; +use parquet2::FallibleStreamingIterator; + +use crate::read::page_to_array; +use crate::Array; + +use super::primitive::array_to_page_v1; + +fn write_file() -> Result> { + let page1 = vec![Some(0), Some(1), None, Some(3), Some(4), Some(5), Some(6)]; + let page2 = vec![Some(10), Some(11)]; + + let options = WriteOptions { + write_statistics: true, + compression: Compression::Uncompressed, + version: Version::V1, + }; + + let schema = SchemaDescriptor::new( + "schema".to_string(), + vec![ParquetType::from_physical( + "col1".to_string(), + PhysicalType::Int32, + )], + ); + + let pages = vec![ + array_to_page_v1::(&page1, &options, &schema.columns()[0].descriptor), + array_to_page_v1::(&page2, &options, &schema.columns()[0].descriptor), + ]; + + let pages = DynStreamingIterator::new(Compressor::new( + DynIter::new(pages.into_iter()), + options.compression, + vec![], + )); + let columns = std::iter::once(Ok(pages)); + + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::new(writer, schema, options, None); + + writer.start()?; + writer.write(DynIter::new(columns))?; + let writer = writer.end(None)?.1; + + Ok(writer.into_inner()) +} + +#[test] +fn read_indexed_page() -> Result<()> { + let data = write_file()?; + let mut reader = Cursor::new(data); + + let metadata = read_metadata(&mut reader)?; + + let column = 0; + let columns = &metadata.row_groups[0].columns(); + + // selected the rows + let intervals = &[Interval::new(2, 2)]; + + let pages = read_pages_locations(&mut reader, columns)?; + + let pages = select_pages(intervals, &pages[column], metadata.row_groups[0].num_rows())?; + + let pages = IndexedPageReader::new(reader, &columns[column], pages, vec![], vec![]); + + let mut pages = BasicDecompressor::new(pages, vec![]); + + let mut arrays = vec![]; + while let Some(page) = pages.next()? { + arrays.push(page_to_array(page)?) + } + + // the second item and length 2 + assert_eq!(arrays, vec![Array::Int32(vec![None, Some(3)])]); + + Ok(()) +} + +#[test] +fn read_indexes_and_locations() -> Result<()> { + let data = write_file()?; + let mut reader = Cursor::new(data); + + let metadata = read_metadata(&mut reader)?; + + let columns = &metadata.row_groups[0].columns(); + + let expected_page_locations = vec![vec![ + PageLocation { + offset: 4, + compressed_page_size: 63, + first_row_index: 0, + }, + PageLocation { + offset: 67, + compressed_page_size: 47, + first_row_index: 7, + }, + ]]; + let expected_index = vec![Box::new(NativeIndex:: { + primitive_type: PrimitiveType::from_physical("col1".to_string(), PhysicalType::Int32), + indexes: vec![ + PageIndex { + min: Some(0), + max: Some(6), + null_count: Some(1), + }, + PageIndex { + min: Some(10), + max: Some(11), + null_count: Some(0), + }, + ], + boundary_order: BoundaryOrder::Unordered, + }) as Box]; + + let indexes = read_columns_indexes(&mut reader, columns)?; + assert_eq!(&indexes, &expected_index); + + let pages = read_pages_locations(&mut reader, columns)?; + assert_eq!(pages, expected_page_locations); + + Ok(()) +} diff --git a/tests/it/write/mod.rs b/tests/it/write/mod.rs index d950ce68b..8035bfdd1 100644 --- a/tests/it/write/mod.rs +++ b/tests/it/write/mod.rs @@ -1,4 +1,5 @@ mod binary; +mod indexes; mod primitive; use std::io::{Cursor, Read, Seek}; @@ -6,10 +7,9 @@ use std::sync::Arc; use parquet2::compression::Compression; use parquet2::error::Result; -use parquet2::indexes::{BoundaryOrder, Index, NativeIndex, PageIndex, PageLocation}; use parquet2::metadata::SchemaDescriptor; -use parquet2::read::{read_columns_indexes, read_metadata, read_pages_locations}; -use parquet2::schema::types::{ParquetType, PhysicalType, PrimitiveType}; +use parquet2::read::read_metadata; +use parquet2::schema::types::{ParquetType, PhysicalType}; use parquet2::statistics::Statistics; use parquet2::write::FileStreamer; use parquet2::write::{Compressor, DynIter, DynStreamingIterator, FileWriter, Version}; @@ -227,89 +227,6 @@ fn basic() -> Result<()> { Ok(()) } -#[test] -fn indexes() -> Result<()> { - let array1 = vec![Some(0), Some(1), None, Some(3), Some(4), Some(5), Some(6)]; - let array2 = vec![Some(10), Some(11)]; - - let options = WriteOptions { - write_statistics: true, - compression: Compression::Uncompressed, - version: Version::V1, - }; - - let schema = SchemaDescriptor::new( - "schema".to_string(), - vec![ParquetType::from_physical( - "col".to_string(), - PhysicalType::Int32, - )], - ); - - let pages = vec![ - array_to_page_v1::(&array1, &options, &schema.columns()[0].descriptor), - array_to_page_v1::(&array2, &options, &schema.columns()[0].descriptor), - ]; - - let pages = DynStreamingIterator::new(Compressor::new( - DynIter::new(pages.into_iter()), - options.compression, - vec![], - )); - let columns = std::iter::once(Ok(pages)); - - let writer = Cursor::new(vec![]); - let mut writer = FileWriter::new(writer, schema, options, None); - - writer.start()?; - writer.write(DynIter::new(columns))?; - let writer = writer.end(None)?.1; - - let data = writer.into_inner(); - let mut reader = Cursor::new(data); - - let metadata = read_metadata(&mut reader)?; - - let columns = &metadata.row_groups[0].columns(); - - let expected_page_locations = vec![vec![ - PageLocation { - offset: 4, - compressed_page_size: 63, - first_row_index: 0, - }, - PageLocation { - offset: 67, - compressed_page_size: 47, - first_row_index: array1.len() as i64, - }, - ]]; - let expected_index = vec![Box::new(NativeIndex:: { - primitive_type: PrimitiveType::from_physical("col".to_string(), PhysicalType::Int32), - indexes: vec![ - PageIndex { - min: Some(0), - max: Some(6), - null_count: Some(1), - }, - PageIndex { - min: Some(10), - max: Some(11), - null_count: Some(0), - }, - ], - boundary_order: BoundaryOrder::Unordered, - }) as Box]; - - let indexes = read_columns_indexes(&mut reader, columns)?; - assert_eq!(&indexes, &expected_index); - - let pages = read_pages_locations(&mut reader, columns)?; - assert_eq!(pages, expected_page_locations); - - Ok(()) -} - async fn test_column_async(column: &str) -> Result<()> { let array = alltypes_plain(column); diff --git a/tests/it/write/primitive.rs b/tests/it/write/primitive.rs index ecad35756..92a155c38 100644 --- a/tests/it/write/primitive.rs +++ b/tests/it/write/primitive.rs @@ -75,6 +75,6 @@ pub fn array_to_page_v1( buffer, None, descriptor.clone(), - Some((0, array.len())), + Some(array.len()), ))) }