Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved bitpacking #176

Merged
merged 4 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bench = false

[dependencies]
parquet-format-safe = "0.2"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
seq-macro = { version = "0.3", default-features = false }
streaming-decompression = "0.1"

async-stream = { version = "0.3.2", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions benches/decode_bitpacking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};

use parquet2::encoding::bitpacking::Decoder;
use parquet2::encoding::bitpacked::Decoder;

fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|log2_size| {
Expand All @@ -11,7 +11,7 @@ fn add_benchmark(c: &mut Criterion) {
.collect::<Vec<_>>();

c.bench_function(&format!("bitpacking 2^{}", log2_size), |b| {
b.iter(|| Decoder::new(&bytes, 1, size).count())
b.iter(|| Decoder::<u32>::new(&bytes, 1, size).count())
});
})
}
Expand Down
173 changes: 173 additions & 0 deletions src/encoding/bitpacked/decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use super::{Packed, Unpackable, Unpacked};

/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes.
/// # Implementation
/// This iterator unpacks bytes in chunks and does not allocate.
#[derive(Debug, Clone)]
pub struct Decoder<'a, T: Unpackable> {
packed: std::slice::Chunks<'a, u8>,
num_bits: usize,
remaining: usize,
current_pack_index: usize, // invariant: < T::PACK_LENGTH
unpacked: T::Unpacked, // has the current unpacked values.
}

#[inline]
fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) {
if packed.len() < T::Unpacked::LENGTH * num_bits / 8 {
let mut buf = T::Packed::zero();
buf.as_mut()[..packed.len()].copy_from_slice(packed);
T::unpack(buf.as_ref(), num_bits, unpacked)
} else {
T::unpack(packed, num_bits, unpacked)
}
}

impl<'a, T: Unpackable> Decoder<'a, T> {
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
pub fn new(packed: &'a [u8], num_bits: usize, mut length: usize) -> Self {
let block_size = std::mem::size_of::<T>() * num_bits;

let mut packed = packed.chunks(block_size);
let mut unpacked = T::Unpacked::zero();
if let Some(chunk) = packed.next() {
decode_pack::<T>(chunk, num_bits, &mut unpacked);
} else {
length = 0
};

Self {
remaining: length,
packed,
num_bits,
unpacked,
current_pack_index: 0,
}
}
}

impl<'a, T: Unpackable> Iterator for Decoder<'a, T> {
type Item = T;

#[inline] // -71% improvement in bench
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let result = self.unpacked[self.current_pack_index];
self.current_pack_index += 1;
self.remaining -= 1;
if self.current_pack_index == T::Unpacked::LENGTH {
if let Some(packed) = self.packed.next() {
decode_pack::<T>(packed, self.num_bits, &mut self.unpacked);
self.current_pack_index = 0;
}
}
Some(result)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}

#[cfg(test)]
mod tests {
use super::super::tests::case1;
use super::*;

#[test]
fn test_decode_rle() {
// Test data: 0-7 with bit width 3
// 0: 000
// 1: 001
// 2: 010
// 3: 011
// 4: 100
// 5: 101
// 6: 110
// 7: 111
let num_bits = 3;
let length = 8;
// encoded: 0b10001000u8, 0b11000110, 0b11111010
let data = vec![0b10001000u8, 0b11000110, 0b11111010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}

#[test]
fn decode_large() {
let (num_bits, expected, data) = case1();

let decoded = Decoder::<u32>::new(&data, num_bits, expected.len()).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn test_decode_bool() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

#[test]
fn test_decode_u64() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u64>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

#[test]
fn even_case() {
// [0, 1, 2, 3, 4, 5, 6, 0]x99
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 99; // 8 * 99 % 32 != 0
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn odd_case() {
// [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2]
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 4;
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(2))
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(0b00000010u8))
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}
}
54 changes: 54 additions & 0 deletions src/encoding/bitpacked/encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::convert::TryInto;

use super::{Packed, Unpackable, Unpacked};

/// Encodes (packs) a slice of [`Unpackable`] into bitpacked bytes `packed`, using `num_bits` per value.
///
/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits
/// and saturates higher values.
///
/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated.
pub fn encode<T: Unpackable>(unpacked: &[T], num_bits: usize, packed: &mut [u8]) {
let chunks = unpacked.chunks_exact(T::Unpacked::LENGTH);

let remainder = chunks.remainder();

let packed_size = (T::Unpacked::LENGTH * num_bits + 7) / 8;
if !remainder.is_empty() {
let packed_chunks = packed.chunks_mut(packed_size);
let mut last_chunk = T::Unpacked::zero();
for i in 0..remainder.len() {
last_chunk[i] = remainder[i]
}

chunks
.chain(std::iter::once(last_chunk.as_ref()))
.zip(packed_chunks)
.for_each(|(unpacked, packed)| {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed);
});
} else {
let packed_chunks = packed.chunks_exact_mut(packed_size);
chunks.zip(packed_chunks).for_each(|(unpacked, packed)| {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed);
});
}
}

/// Encodes (packs) a potentially incomplete pack of [`Unpackable`] into bitpacked
/// bytes `packed`, using `num_bits` per value.
///
/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits
/// and saturates higher values.
///
/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated.
#[inline]
pub fn encode_pack<T: Unpackable>(unpacked: &[T], num_bits: usize, packed: &mut [u8]) {
if unpacked.len() < T::Packed::LENGTH {
let mut complete_unpacked = T::Unpacked::zero();
complete_unpacked.as_mut()[..unpacked.len()].copy_from_slice(unpacked);
T::pack(&complete_unpacked, num_bits, packed)
} else {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed)
}
}
Loading