Skip to content

Commit

Permalink
Fixed design in filtered pages (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Apr 4, 2022
1 parent eea5bfd commit f8eb243
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 252 deletions.
110 changes: 100 additions & 10 deletions src/deserialize/hybrid_rle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::encoding::hybrid_rle;
use crate::encoding::hybrid_rle::{self, BitmapIter};

#[derive(Debug, PartialEq, Eq)]
/// The decoding state of the hybrid-RLE decoder with a maximum definition level of 1
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HybridEncoded<'a> {
/// a bitmap
Bitmap(&'a [u8], usize, usize),
Expand All @@ -9,10 +10,15 @@ pub enum HybridEncoded<'a> {
Repeated(bool, usize),
}

/// An iterator of [`HybridEncoded`], adapter over [`hybrid_rle::HybridEncoded`],
/// specialized to be consumed into bitmaps.
pub trait HybridRleRunsIterator<'a>: Iterator<Item = HybridEncoded<'a>> {
/// Number of elements remaining. This may not be the items of the iterator - an item
/// of the iterator may contain more than one element.
fn number_of_elements(&self) -> usize;
}

/// An iterator of [`HybridEncoded`], adapter over [`hybrid_rle::HybridEncoded`].
#[derive(Debug, Clone)]
pub struct HybridBitmapIter<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> {
pub struct HybridRleIter<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> {
iter: I,
current: Option<hybrid_rle::HybridEncoded<'a>>,
// invariants:
Expand All @@ -27,8 +33,8 @@ pub struct HybridBitmapIter<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>
length: usize,
}

impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridBitmapIter<'a, I> {
/// Returns a new [`HybridBitmapIter`]
impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridRleIter<'a, I> {
/// Returns a new [`HybridRleIter`]
#[inline]
pub fn new(mut iter: I, length: usize) -> Self {
let current = iter.next();
Expand All @@ -41,7 +47,7 @@ impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridBitmapIter<'a,
}
}

/// the number of elements in the iterator
/// the number of elements in the iterator. Note that this _is not_ the number of runs.
#[inline]
pub fn len(&self) -> usize {
self.length - self.consumed
Expand All @@ -53,7 +59,7 @@ impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridBitmapIter<'a,
}

/// fetches the next bitmap, optionally limited.
/// When limited, a run of the hybrid may return an offsetted bitmap
/// When limited, a run may return an offsetted bitmap
pub fn limited_next(&mut self, limit: Option<usize>) -> Option<HybridEncoded<'a>> {
if self.consumed == self.length {
return None;
Expand Down Expand Up @@ -118,10 +124,94 @@ impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridBitmapIter<'a,
}
}

impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> Iterator for HybridBitmapIter<'a, I> {
impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> HybridRleRunsIterator<'a>
for HybridRleIter<'a, I>
{
fn number_of_elements(&self) -> usize {
self.len()
}
}

impl<'a, I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>> Iterator for HybridRleIter<'a, I> {
type Item = HybridEncoded<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.limited_next(None)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}

/// Type definition for a [`HybridRleIter`] using [`hybrid_rle::Decoder`].
pub type HybridDecoderBitmapIter<'a> = HybridRleIter<'a, hybrid_rle::Decoder<'a>>;

#[derive(Debug)]
enum HybridBooleanState<'a> {
/// a bitmap
Bitmap(BitmapIter<'a>),
/// A repeated item. The first attribute corresponds to whether the value is set
/// the second attribute corresponds to the number of repetitions.
Repeated(bool, usize),
}

/// An iterator adapter that maps an iterator of [`HybridEncoded`] into an iterator
/// over [`bool`].
#[derive(Debug)]
pub struct HybridRleBooleanIter<'a, I: Iterator<Item = HybridEncoded<'a>>> {
iter: I,
current_run: Option<HybridBooleanState<'a>>,
}

impl<'a, I: Iterator<Item = HybridEncoded<'a>>> HybridRleBooleanIter<'a, I> {
pub fn new(iter: I) -> Self {
Self {
iter,
current_run: None,
}
}
}

impl<'a, I: HybridRleRunsIterator<'a>> Iterator for HybridRleBooleanIter<'a, I> {
type Item = bool;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(run) = &mut self.current_run {
match run {
HybridBooleanState::Bitmap(bitmap) => bitmap.next(),
HybridBooleanState::Repeated(value, remaining) => {
if *remaining == 0 {
None
} else {
*remaining -= 1;
Some(*value)
}
}
}
} else if let Some(run) = self.iter.next() {
self.current_run = Some(match run {
HybridEncoded::Bitmap(bitmap, offset, length) => {
HybridBooleanState::Bitmap(BitmapIter::new(bitmap, offset, length))
}
HybridEncoded::Repeated(value, length) => {
HybridBooleanState::Repeated(value, length)
}
});
self.next()
} else {
None
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let exact = self.iter.number_of_elements();
(exact, Some(exact))
}
}

/// Type definition for a [`HybridRleBooleanIter`] using [`hybrid_rle::Decoder`].
pub type HybridRleDecoderIter<'a> = HybridRleBooleanIter<'a, HybridDecoderBitmapIter<'a>>;
2 changes: 1 addition & 1 deletion src/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ pub use boolean::*;
pub use fixed_len::*;
pub use hybrid_rle::*;
pub use native::*;
pub use utils::{DefLevelsDecoder, HybridDecoderBitmapIter};
pub use utils::{DefLevelsDecoder, OptionalValues, SliceFilteredIter};
118 changes: 112 additions & 6 deletions src/deserialize/utils.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::VecDeque;

use crate::{
encoding::hybrid_rle::{self, HybridRleDecoder},
indexes::Interval,
page::{split_buffer, DataPage},
read::levels::get_bit_width,
};

use super::hybrid_rle::HybridBitmapIter;
use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter};

pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDecoder {
let (_, _, indices_buffer) = split_buffer(page);
Expand All @@ -17,17 +20,14 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDeco
hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, page.num_values())
}

/// Type definition for a [`HybridBitmapIter`]
pub type HybridDecoderBitmapIter<'a> = HybridBitmapIter<'a, hybrid_rle::Decoder<'a>>;

/// Decoder of definition levels.
#[derive(Debug)]
pub enum DefLevelsDecoder<'a> {
/// When the maximum definition level is 1, the definition levels are RLE-encoded and
/// the bitpacked runs are bitmaps. This variant contains [`HybridDecoderBitmapIter`]
/// that decodes the runs, but not the individual values
Bitmap(HybridDecoderBitmapIter<'a>),
/// When the maximum definition level is 1,
/// When the maximum definition level is larger than 1
Levels(HybridRleDecoder<'a>, u32),
}

Expand All @@ -38,7 +38,7 @@ impl<'a> DefLevelsDecoder<'a> {
let max_def_level = page.descriptor.max_def_level;
if max_def_level == 1 {
let iter = hybrid_rle::Decoder::new(def_levels, 1);
let iter = HybridBitmapIter::new(iter, page.num_values());
let iter = HybridRleIter::new(iter, page.num_values());
Self::Bitmap(iter)
} else {
let iter =
Expand All @@ -47,3 +47,109 @@ impl<'a> DefLevelsDecoder<'a> {
}
}
}

/// Iterator adapter to convert an iterator of non-null values and an iterator over validity
/// into an iterator of optional values.
#[derive(Debug, Clone)]
pub struct OptionalValues<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> {
validity: V,
values: I,
}

impl<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> OptionalValues<T, V, I> {
pub fn new(validity: V, values: I) -> Self {
Self { validity, values }
}
}

impl<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> Iterator for OptionalValues<T, V, I> {
type Item = Option<T>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.validity
.next()
.map(|x| if x { self.values.next() } else { None })
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.validity.size_hint()
}
}

/// An iterator adapter that converts an iterator over items into an iterator over slices of
/// those N items.
///
/// This iterator is best used with iterators that implement `nth` since skipping items
/// allows this iterator to skip sequences of items without having to call each of them.
#[derive(Debug, Clone)]
pub struct SliceFilteredIter<I> {
iter: I,
selected_rows: VecDeque<Interval>,
current_remaining: usize,
current: usize, // position in the slice
}

impl<I> SliceFilteredIter<I> {
/// Return a new [`SliceFilteredIter`]
pub fn new(iter: I, selected_rows: VecDeque<Interval>) -> Self {
Self {
iter,
selected_rows,
current_remaining: 0,
current: 0,
}
}
}

impl<T, I: Iterator<Item = T>> Iterator for SliceFilteredIter<I> {
type Item = T;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.current_remaining == 0 {
if let Some(interval) = self.selected_rows.pop_front() {
// skip the hole between the previous start end this start
// (start + length) - start
let item = self.iter.nth(interval.start - self.current);
self.current = interval.start + interval.length;
self.current_remaining = interval.length - 1;
item
} else {
None
}
} else {
self.current_remaining -= 1;
self.iter.next()
}
}
}

#[cfg(test)]
mod test {
use std::collections::VecDeque;

use super::*;

#[test]
fn basic() {
let iter = 0..=100;

let intervals = vec![
Interval::new(0, 2),
Interval::new(20, 11),
Interval::new(31, 1),
];

let a: VecDeque<Interval> = intervals.clone().into_iter().collect();
let a = SliceFilteredIter::new(iter, a);

let expected: Vec<usize> = intervals
.into_iter()
.flat_map(|interval| interval.start..(interval.start + interval.length))
.collect();

assert_eq!(expected, a.collect::<Vec<_>>());
}
}
Loading

0 comments on commit f8eb243

Please sign in to comment.