diff --git a/Cargo.toml b/Cargo.toml index 54f1dd7cc..ccece23b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ bench = false [dependencies] parquet-format-safe = "0.2" -bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] } +seq-macro = { version = "0.3", default-features = false } streaming-decompression = "0.1" async-stream = { version = "0.3.2", optional = true } diff --git a/benches/decode_bitpacking.rs b/benches/decode_bitpacking.rs index 70b5bffdd..0023550e3 100644 --- a/benches/decode_bitpacking.rs +++ b/benches/decode_bitpacking.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use parquet2::encoding::bitpacking::Decoder; +use parquet2::encoding::bitpacked::Decoder; fn add_benchmark(c: &mut Criterion) { (10..=20).step_by(2).for_each(|log2_size| { @@ -11,7 +11,7 @@ fn add_benchmark(c: &mut Criterion) { .collect::>(); c.bench_function(&format!("bitpacking 2^{}", log2_size), |b| { - b.iter(|| Decoder::new(&bytes, 1, size).count()) + b.iter(|| Decoder::::new(&bytes, 1, size).count()) }); }) } diff --git a/src/encoding/bitpacked/decode.rs b/src/encoding/bitpacked/decode.rs new file mode 100644 index 000000000..32c78c230 --- /dev/null +++ b/src/encoding/bitpacked/decode.rs @@ -0,0 +1,173 @@ +use super::{Packed, Unpackable, Unpacked}; + +/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes. +/// # Implementation +/// This iterator unpacks bytes in chunks and does not allocate. +#[derive(Debug, Clone)] +pub struct Decoder<'a, T: Unpackable> { + packed: std::slice::Chunks<'a, u8>, + num_bits: usize, + remaining: usize, + current_pack_index: usize, // invariant: < T::PACK_LENGTH + unpacked: T::Unpacked, // has the current unpacked values. +} + +#[inline] +fn decode_pack(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) { + if packed.len() < T::Unpacked::LENGTH * num_bits / 8 { + let mut buf = T::Packed::zero(); + buf.as_mut()[..packed.len()].copy_from_slice(packed); + T::unpack(buf.as_ref(), num_bits, unpacked) + } else { + T::unpack(packed, num_bits, unpacked) + } +} + +impl<'a, T: Unpackable> Decoder<'a, T> { + /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. + pub fn new(packed: &'a [u8], num_bits: usize, mut length: usize) -> Self { + let block_size = std::mem::size_of::() * num_bits; + + let mut packed = packed.chunks(block_size); + let mut unpacked = T::Unpacked::zero(); + if let Some(chunk) = packed.next() { + decode_pack::(chunk, num_bits, &mut unpacked); + } else { + length = 0 + }; + + Self { + remaining: length, + packed, + num_bits, + unpacked, + current_pack_index: 0, + } + } +} + +impl<'a, T: Unpackable> Iterator for Decoder<'a, T> { + type Item = T; + + #[inline] // -71% improvement in bench + fn next(&mut self) -> Option { + if self.remaining == 0 { + return None; + } + let result = self.unpacked[self.current_pack_index]; + self.current_pack_index += 1; + self.remaining -= 1; + if self.current_pack_index == T::Unpacked::LENGTH { + if let Some(packed) = self.packed.next() { + decode_pack::(packed, self.num_bits, &mut self.unpacked); + self.current_pack_index = 0; + } + } + Some(result) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (self.remaining, Some(self.remaining)) + } +} + +#[cfg(test)] +mod tests { + use super::super::tests::case1; + use super::*; + + #[test] + fn test_decode_rle() { + // Test data: 0-7 with bit width 3 + // 0: 000 + // 1: 001 + // 2: 010 + // 3: 011 + // 4: 100 + // 5: 101 + // 6: 110 + // 7: 111 + let num_bits = 3; + let length = 8; + // encoded: 0b10001000u8, 0b11000110, 0b11111010 + let data = vec![0b10001000u8, 0b11000110, 0b11111010]; + + let decoded = Decoder::::new(&data, num_bits, length).collect::>(); + assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]); + } + + #[test] + fn decode_large() { + let (num_bits, expected, data) = case1(); + + let decoded = Decoder::::new(&data, num_bits, expected.len()).collect::>(); + assert_eq!(decoded, expected); + } + + #[test] + fn test_decode_bool() { + let num_bits = 1; + let length = 8; + let data = vec![0b10101010]; + + let decoded = Decoder::::new(&data, num_bits, length).collect::>(); + assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]); + } + + #[test] + fn test_decode_u64() { + let num_bits = 1; + let length = 8; + let data = vec![0b10101010]; + + let decoded = Decoder::::new(&data, num_bits, length).collect::>(); + assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]); + } + + #[test] + fn even_case() { + // [0, 1, 2, 3, 4, 5, 6, 0]x99 + let data = &[0b10001000u8, 0b11000110, 0b00011010]; + let num_bits = 3; + let copies = 99; // 8 * 99 % 32 != 0 + let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0]) + .take(copies) + .flatten() + .copied() + .collect::>(); + let data = std::iter::repeat(data) + .take(copies) + .flatten() + .copied() + .collect::>(); + let length = expected.len(); + + let decoded = Decoder::::new(&data, num_bits, length).collect::>(); + assert_eq!(decoded, expected); + } + + #[test] + fn odd_case() { + // [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2] + let data = &[0b10001000u8, 0b11000110, 0b00011010]; + let num_bits = 3; + let copies = 4; + let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0]) + .take(copies) + .flatten() + .copied() + .chain(std::iter::once(2)) + .collect::>(); + let data = std::iter::repeat(data) + .take(copies) + .flatten() + .copied() + .chain(std::iter::once(0b00000010u8)) + .collect::>(); + let length = expected.len(); + + let decoded = Decoder::::new(&data, num_bits, length).collect::>(); + assert_eq!(decoded, expected); + } +} diff --git a/src/encoding/bitpacked/encode.rs b/src/encoding/bitpacked/encode.rs new file mode 100644 index 000000000..904ff796d --- /dev/null +++ b/src/encoding/bitpacked/encode.rs @@ -0,0 +1,54 @@ +use std::convert::TryInto; + +use super::{Packed, Unpackable, Unpacked}; + +/// Encodes (packs) a slice of [`Unpackable`] into bitpacked bytes `packed`, using `num_bits` per value. +/// +/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits +/// and saturates higher values. +/// +/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated. +pub fn encode(unpacked: &[T], num_bits: usize, packed: &mut [u8]) { + let chunks = unpacked.chunks_exact(T::Unpacked::LENGTH); + + let remainder = chunks.remainder(); + + let packed_size = (T::Unpacked::LENGTH * num_bits + 7) / 8; + if !remainder.is_empty() { + let packed_chunks = packed.chunks_mut(packed_size); + let mut last_chunk = T::Unpacked::zero(); + for i in 0..remainder.len() { + last_chunk[i] = remainder[i] + } + + chunks + .chain(std::iter::once(last_chunk.as_ref())) + .zip(packed_chunks) + .for_each(|(unpacked, packed)| { + T::pack(&unpacked.try_into().unwrap(), num_bits, packed); + }); + } else { + let packed_chunks = packed.chunks_exact_mut(packed_size); + chunks.zip(packed_chunks).for_each(|(unpacked, packed)| { + T::pack(&unpacked.try_into().unwrap(), num_bits, packed); + }); + } +} + +/// Encodes (packs) a potentially incomplete pack of [`Unpackable`] into bitpacked +/// bytes `packed`, using `num_bits` per value. +/// +/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits +/// and saturates higher values. +/// +/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated. +#[inline] +pub fn encode_pack(unpacked: &[T], num_bits: usize, packed: &mut [u8]) { + if unpacked.len() < T::Packed::LENGTH { + let mut complete_unpacked = T::Unpacked::zero(); + complete_unpacked.as_mut()[..unpacked.len()].copy_from_slice(unpacked); + T::pack(&complete_unpacked, num_bits, packed) + } else { + T::pack(&unpacked.try_into().unwrap(), num_bits, packed) + } +} diff --git a/src/encoding/bitpacked/mod.rs b/src/encoding/bitpacked/mod.rs new file mode 100644 index 000000000..a05ca2040 --- /dev/null +++ b/src/encoding/bitpacked/mod.rs @@ -0,0 +1,220 @@ +mod decode; +mod encode; +mod pack; +mod unpack; + +pub use decode::Decoder; +pub use encode::{encode, encode_pack}; + +/// A byte slice (e.g. `[u8; 8]`) denoting types that represent complete packs. +pub trait Packed: + Copy + + Sized + + AsRef<[u8]> + + AsMut<[u8]> + + std::ops::IndexMut + + for<'a> TryFrom<&'a [u8]> +{ + const LENGTH: usize; + fn zero() -> Self; +} + +impl Packed for [u8; 8] { + const LENGTH: usize = 8; + #[inline] + fn zero() -> Self { + [0; 8] + } +} + +impl Packed for [u8; 16 * 2] { + const LENGTH: usize = 16 * 2; + #[inline] + fn zero() -> Self { + [0; 16 * 2] + } +} + +impl Packed for [u8; 32 * 4] { + const LENGTH: usize = 32 * 4; + #[inline] + fn zero() -> Self { + [0; 32 * 4] + } +} + +impl Packed for [u8; 64 * 64] { + const LENGTH: usize = 64 * 64; + #[inline] + fn zero() -> Self { + [0; 64 * 64] + } +} + +/// A byte slice of [`Unpackable`] denoting complete unpacked arrays. +pub trait Unpacked: + Copy + + Sized + + AsRef<[T]> + + AsMut<[T]> + + std::ops::Index + + std::ops::IndexMut + + for<'a> TryFrom<&'a [T], Error = std::array::TryFromSliceError> +{ + const LENGTH: usize; + fn zero() -> Self; +} + +impl Unpacked for [u8; 8] { + const LENGTH: usize = 8; + #[inline] + fn zero() -> Self { + [0; 8] + } +} + +impl Unpacked for [u16; 16] { + const LENGTH: usize = 16; + #[inline] + fn zero() -> Self { + [0; 16] + } +} + +impl Unpacked for [u32; 32] { + const LENGTH: usize = 32; + #[inline] + fn zero() -> Self { + [0; 32] + } +} + +impl Unpacked for [u64; 64] { + const LENGTH: usize = 64; + #[inline] + fn zero() -> Self { + [0; 64] + } +} + +/// A type representing a type that can be bitpacked and unpacked by this crate. +pub trait Unpackable: Copy + Sized + Default { + type Packed: Packed; + type Unpacked: Unpacked; + fn unpack(packed: &[u8], num_bits: usize, unpacked: &mut Self::Unpacked); + fn pack(unpacked: &Self::Unpacked, num_bits: usize, packed: &mut [u8]); +} + +impl Unpackable for u8 { + type Packed = [u8; 8]; + type Unpacked = [u8; 8]; + + #[inline] + fn unpack(packed: &[u8], num_bits: usize, unpacked: &mut Self::Unpacked) { + unpack::unpack8(packed, unpacked, num_bits) + } + + #[inline] + fn pack(packed: &Self::Unpacked, num_bits: usize, unpacked: &mut [u8]) { + pack::pack8(packed, unpacked, num_bits) + } +} + +impl Unpackable for u16 { + type Packed = [u8; 16 * 2]; + type Unpacked = [u16; 16]; + + #[inline] + fn unpack(packed: &[u8], num_bits: usize, unpacked: &mut Self::Unpacked) { + unpack::unpack16(packed, unpacked, num_bits) + } + + #[inline] + fn pack(packed: &Self::Unpacked, num_bits: usize, unpacked: &mut [u8]) { + pack::pack16(packed, unpacked, num_bits) + } +} + +impl Unpackable for u32 { + type Packed = [u8; 32 * 4]; + type Unpacked = [u32; 32]; + + #[inline] + fn unpack(packed: &[u8], num_bits: usize, unpacked: &mut Self::Unpacked) { + unpack::unpack32(packed, unpacked, num_bits) + } + + #[inline] + fn pack(packed: &Self::Unpacked, num_bits: usize, unpacked: &mut [u8]) { + pack::pack32(packed, unpacked, num_bits) + } +} + +impl Unpackable for u64 { + type Packed = [u8; 64 * 64]; + type Unpacked = [u64; 64]; + + #[inline] + fn unpack(packed: &[u8], num_bits: usize, unpacked: &mut Self::Unpacked) { + unpack::unpack64(packed, unpacked, num_bits) + } + + #[inline] + fn pack(packed: &Self::Unpacked, num_bits: usize, unpacked: &mut [u8]) { + pack::pack64(packed, unpacked, num_bits) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + pub fn case1() -> (usize, Vec, Vec) { + let num_bits = 3; + let compressed = vec![ + 0b10001000u8, + 0b11000110, + 0b11111010, + 0b10001000u8, + 0b11000110, + 0b11111010, + 0b10001000u8, + 0b11000110, + 0b11111010, + 0b10001000u8, + 0b11000110, + 0b11111010, + 0b10001000u8, + 0b11000110, + 0b11111010, + ]; + let decompressed = vec![ + 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, + 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, + ]; + (num_bits, decompressed, compressed) + } + + #[test] + fn encode_large() { + let (num_bits, unpacked, expected) = case1(); + let mut packed = vec![0u8; 4 * 32]; + + encode(&unpacked, num_bits, &mut packed); + assert_eq!(&packed[..15], expected); + } + + #[test] + fn test_encode() { + let num_bits = 3; + let unpacked = vec![0, 1, 2, 3, 4, 5, 6, 7]; + + let mut packed = vec![0u8; 4 * 32]; + + encode::(&unpacked, num_bits, &mut packed); + + let expected = vec![0b10001000u8, 0b11000110, 0b11111010]; + + assert_eq!(&packed[..3], expected); + } +} diff --git a/src/encoding/bitpacked/pack.rs b/src/encoding/bitpacked/pack.rs new file mode 100644 index 000000000..55183d36d --- /dev/null +++ b/src/encoding/bitpacked/pack.rs @@ -0,0 +1,108 @@ +/// Macro that generates a packing function taking the number of bits as a const generic +macro_rules! pack_impl { + ($t:ty, $bytes:literal, $bits:tt) => { + pub fn pack(input: &[$t; $bits], output: &mut [u8]) { + if NUM_BITS == 0 { + for out in output { + *out = 0; + } + return; + } + assert!(NUM_BITS <= $bytes * 8); + assert!(output.len() >= NUM_BITS * $bytes); + + let mask = match NUM_BITS { + $bits => <$t>::MAX, + _ => ((1 << NUM_BITS) - 1), + }; + + for i in 0..$bits { + let start_bit = i * NUM_BITS; + let end_bit = start_bit + NUM_BITS; + + let start_bit_offset = start_bit % $bits; + let end_bit_offset = end_bit % $bits; + let start_byte = start_bit / $bits; + let end_byte = end_bit / $bits; + if start_byte != end_byte && end_bit_offset != 0 { + let a = input[i] << start_bit_offset; + let val_a = <$t>::to_le_bytes(a); + for i in 0..$bytes { + output[start_byte * $bytes + i] |= val_a[i] + } + + let b = (input[i] >> (NUM_BITS - end_bit_offset)) & mask; + let val_b = <$t>::to_le_bytes(b); + for i in 0..$bytes { + output[end_byte * $bytes + i] |= val_b[i] + } + } else { + let val = (input[i] & mask) << start_bit_offset; + let val = <$t>::to_le_bytes(val); + + for i in 0..$bytes { + output[start_byte * $bytes + i] |= val[i] + } + } + } + } + }; +} + +/// Macro that generates pack functions that accept num_bits as a parameter +macro_rules! pack { + ($name:ident, $t:ty, $bytes:literal, $bits:tt) => { + mod $name { + pack_impl!($t, $bytes, $bits); + } + + /// Pack unpacked `input` into `output` with a bit width of `num_bits` + pub fn $name(input: &[$t; $bits], output: &mut [u8], num_bits: usize) { + // This will get optimised into a jump table + seq_macro::seq!(i in 0..=$bits { + if i == num_bits { + return $name::pack::(input, output); + } + }); + unreachable!("invalid num_bits {}", num_bits); + } + }; +} + +pack!(pack8, u8, 1, 8); +pack!(pack16, u16, 2, 16); +pack!(pack32, u32, 4, 32); +pack!(pack64, u64, 8, 64); + +#[cfg(test)] +mod tests { + use super::super::unpack::*; + use super::*; + + #[test] + fn test_basic() { + let input = [0u16, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; + for num_bits in 4..16 { + let mut output = [0u8; 16 * 2]; + pack16(&input, &mut output, num_bits); + let mut other = [0u16; 16]; + unpack16(&output, &mut other, num_bits); + assert_eq!(other, input); + } + } + + #[test] + fn test_u32() { + let input = [ + 0u32, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0u32, 1, 2, 3, 4, 5, 6, 7, 8, + 9, 10, 11, 12, 13, 14, 15, + ]; + for num_bits in 4..32 { + let mut output = [0u8; 32 * 4]; + pack32(&input, &mut output, num_bits); + let mut other = [0u32; 32]; + unpack32(&output, &mut other, num_bits); + assert_eq!(other, input); + } + } +} diff --git a/src/encoding/bitpacked/unpack.rs b/src/encoding/bitpacked/unpack.rs new file mode 100644 index 000000000..061b3acef --- /dev/null +++ b/src/encoding/bitpacked/unpack.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Copied from https://github.com/apache/arrow-rs/blob/6859efa690d4c9530cf8a24053bc6ed81025a164/parquet/src/util/bit_pack.rs + +/// Macro that generates an unpack function taking the number of bits as a const generic +macro_rules! unpack_impl { + ($t:ty, $bytes:literal, $bits:tt) => { + pub fn unpack(input: &[u8], output: &mut [$t; $bits]) { + if NUM_BITS == 0 { + for out in output { + *out = 0; + } + return; + } + + assert!(NUM_BITS <= $bytes * 8); + + let mask = match NUM_BITS { + $bits => <$t>::MAX, + _ => ((1 << NUM_BITS) - 1), + }; + + assert!(input.len() >= NUM_BITS * $bytes); + + let r = |output_idx: usize| { + <$t>::from_le_bytes( + input[output_idx * $bytes..output_idx * $bytes + $bytes] + .try_into() + .unwrap(), + ) + }; + + seq_macro::seq!(i in 0..$bits { + let start_bit = i * NUM_BITS; + let end_bit = start_bit + NUM_BITS; + + let start_bit_offset = start_bit % $bits; + let end_bit_offset = end_bit % $bits; + let start_byte = start_bit / $bits; + let end_byte = end_bit / $bits; + if start_byte != end_byte && end_bit_offset != 0 { + let val = r(start_byte); + let a = val >> start_bit_offset; + let val = r(end_byte); + let b = val << (NUM_BITS - end_bit_offset); + + output[i] = a | (b & mask); + } else { + let val = r(start_byte); + output[i] = (val >> start_bit_offset) & mask; + } + }); + } + }; +} + +/// Macro that generates unpack functions that accept num_bits as a parameter +macro_rules! unpack { + ($name:ident, $t:ty, $bytes:literal, $bits:tt) => { + mod $name { + unpack_impl!($t, $bytes, $bits); + } + + /// Unpack packed `input` into `output` with a bit width of `num_bits` + pub fn $name(input: &[u8], output: &mut [$t; $bits], num_bits: usize) { + // This will get optimised into a jump table + seq_macro::seq!(i in 0..=$bits { + if i == num_bits { + return $name::unpack::(input, output); + } + }); + unreachable!("invalid num_bits {}", num_bits); + } + }; +} + +unpack!(unpack8, u8, 1, 8); +unpack!(unpack16, u16, 2, 16); +unpack!(unpack32, u32, 4, 32); +unpack!(unpack64, u64, 8, 64); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic() { + let input = [0xFF; 4096]; + + for i in 0..=8 { + let mut output = [0; 8]; + unpack8(&input, &mut output, i); + for (idx, out) in output.iter().enumerate() { + assert_eq!(out.trailing_ones() as usize, i, "out[{}] = {}", idx, out); + } + } + + for i in 0..=16 { + let mut output = [0; 16]; + unpack16(&input, &mut output, i); + for (idx, out) in output.iter().enumerate() { + assert_eq!(out.trailing_ones() as usize, i, "out[{}] = {}", idx, out); + } + } + + for i in 0..=32 { + let mut output = [0; 32]; + unpack32(&input, &mut output, i); + for (idx, out) in output.iter().enumerate() { + assert_eq!(out.trailing_ones() as usize, i, "out[{}] = {}", idx, out); + } + } + + for i in 0..=64 { + let mut output = [0; 64]; + unpack64(&input, &mut output, i); + for (idx, out) in output.iter().enumerate() { + assert_eq!(out.trailing_ones() as usize, i, "out[{}] = {}", idx, out); + } + } + } +} diff --git a/src/encoding/bitpacking.rs b/src/encoding/bitpacking.rs deleted file mode 100644 index f92bf9a4d..000000000 --- a/src/encoding/bitpacking.rs +++ /dev/null @@ -1,257 +0,0 @@ -use std::convert::TryInto; -use std::mem; - -/// Usual bitpacking -use bitpacking::BitPacker; -use bitpacking::BitPacker1x; - -use super::ceil8; - -pub const BLOCK_LEN: usize = bitpacking::BitPacker1x::BLOCK_LEN; - -/// Encodes `u32` values into a buffer using `num_bits`. -pub fn encode(decompressed: &[u32], num_bits: u8, compressed: &mut [u8]) -> usize { - let chunks = decompressed.chunks_exact(BitPacker1x::BLOCK_LEN); - - let remainder = chunks.remainder(); - - let size = ceil8(BitPacker1x::BLOCK_LEN * num_bits as usize); - if !remainder.is_empty() { - let mut last_chunk = remainder.to_vec(); - let trailing = BitPacker1x::BLOCK_LEN - remainder.len(); - last_chunk.extend(std::iter::repeat(0).take(trailing)); - - let mut compressed_len = 0; - chunks - .chain(std::iter::once(last_chunk.as_ref())) - .for_each(|chunk| { - let chunk_compressed = &mut compressed[compressed_len..compressed_len + size]; - compressed_len += - encode_pack(chunk.try_into().unwrap(), num_bits, chunk_compressed); - }); - } else { - let mut compressed_len = 0; - chunks.for_each(|chunk| { - let chunk_compressed = &mut compressed[compressed_len..compressed_len + size]; - compressed_len += encode_pack(chunk.try_into().unwrap(), num_bits, chunk_compressed); - }); - } - - decompressed.len() * num_bits as usize / 8 -} - -/// Encodes `u32` values into a buffer using `num_bits`. -#[inline] -pub fn encode_pack(decompressed: [u32; BLOCK_LEN], num_bits: u8, compressed: &mut [u8]) -> usize { - BitPacker1x::new().compress(&decompressed, compressed, num_bits) -} - -#[derive(Debug, Clone)] -pub struct Decoder<'a> { - compressed_chunks: std::slice::Chunks<'a, u8>, - num_bits: u8, - remaining: usize, - current_pack_index: usize, // invariant: ()]; - buf[..compressed.len()].copy_from_slice(compressed); - BitPacker1x::new().decompress(&buf, pack, num_bits); - } else { - BitPacker1x::new().decompress(compressed, pack, num_bits); - } -} - -impl<'a> Decoder<'a> { - pub fn new(compressed: &'a [u8], num_bits: u8, mut length: usize) -> Self { - let compressed_block_size = BitPacker1x::BLOCK_LEN * num_bits as usize / 8; - - let mut compressed_chunks = compressed.chunks(compressed_block_size); - let mut current_pack = [0; BitPacker1x::BLOCK_LEN]; - if let Some(chunk) = compressed_chunks.next() { - decode_pack(chunk, num_bits, &mut current_pack); - } else { - length = 0 - }; - - Self { - remaining: length, - compressed_chunks, - num_bits, - current_pack, - current_pack_index: 0, - } - } -} - -impl<'a> Iterator for Decoder<'a> { - type Item = u32; - - #[inline] // -71% improvement in bench - fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; - } - let result = self.current_pack[self.current_pack_index]; - self.current_pack_index += 1; - if self.current_pack_index == BitPacker1x::BLOCK_LEN { - if let Some(chunk) = self.compressed_chunks.next() { - decode_pack(chunk, self.num_bits, &mut self.current_pack); - self.current_pack_index = 0; - } - } - self.remaining -= 1; - Some(result) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - (self.remaining, Some(self.remaining)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_decode_rle() { - // Test data: 0-7 with bit width 3 - // 0: 000 - // 1: 001 - // 2: 010 - // 3: 011 - // 4: 100 - // 5: 101 - // 6: 110 - // 7: 111 - let num_bits = 3; - let length = 8; - // encoded: 0b10001000u8, 0b11000110, 0b11111010 - let data = vec![0b10001000u8, 0b11000110, 0b11111010]; - - let decoded = Decoder::new(&data, num_bits, length).collect::>(); - assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]); - } - - fn case1() -> (u8, Vec, Vec) { - let num_bits = 3; - let compressed = vec![ - 0b10001000u8, - 0b11000110, - 0b11111010, - 0b10001000u8, - 0b11000110, - 0b11111010, - 0b10001000u8, - 0b11000110, - 0b11111010, - 0b10001000u8, - 0b11000110, - 0b11111010, - 0b10001000u8, - 0b11000110, - 0b11111010, - ]; - let decompressed = vec![ - 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, - 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, - ]; - (num_bits, decompressed, compressed) - } - - #[test] - fn decode_large() { - let (num_bits, expected, data) = case1(); - - let decoded = Decoder::new(&data, num_bits, expected.len()).collect::>(); - assert_eq!(decoded, expected); - } - - #[test] - fn encode_large() { - let (num_bits, data, expected) = case1(); - let mut compressed = vec![0u8; 4 * BitPacker1x::BLOCK_LEN]; - - let compressed_len = encode(&data, num_bits, &mut compressed); - compressed.truncate(compressed_len); - assert_eq!(compressed, expected); - } - - #[test] - fn test_encode() { - let num_bits = 3; - let data = vec![0, 1, 2, 3, 4, 5, 6, 7]; - - let mut compressed = vec![0u8; 4 * BitPacker1x::BLOCK_LEN]; - - let compressed_len = encode(&data, num_bits, &mut compressed); - compressed.truncate(compressed_len); - - let expected = vec![0b10001000u8, 0b11000110, 0b11111010]; - - assert_eq!(compressed, expected); - } - - #[test] - fn test_decode_bool() { - let num_bits = 1; - let length = 8; - let data = vec![0b10101010]; - - let decoded = Decoder::new(&data, num_bits, length).collect::>(); - assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]); - } - - #[test] - fn even_case() { - // [0, 1, 2, 3, 4, 5, 6, 0]x99 - let data = &[0b10001000u8, 0b11000110, 0b00011010]; - let num_bits = 3; - let copies = 99; // 8 * 99 % 32 != 0 - let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0]) - .take(copies) - .flatten() - .copied() - .collect::>(); - let data = std::iter::repeat(data) - .take(copies) - .flatten() - .copied() - .collect::>(); - let length = expected.len(); - - let decoded = Decoder::new(&data, num_bits, length).collect::>(); - assert_eq!(decoded, expected); - } - - #[test] - fn odd_case() { - // [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2] - let data = &[0b10001000u8, 0b11000110, 0b00011010]; - let num_bits = 3; - let copies = 4; - let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0]) - .take(copies) - .flatten() - .copied() - .chain(std::iter::once(2)) - .collect::>(); - let data = std::iter::repeat(data) - .take(copies) - .flatten() - .copied() - .chain(std::iter::once(0b00000010u8)) - .collect::>(); - let length = expected.len(); - - let decoded = Decoder::new(&data, num_bits, length).collect::>(); - assert_eq!(decoded, expected); - } -} diff --git a/src/encoding/delta_bitpacked/decoder.rs b/src/encoding/delta_bitpacked/decoder.rs index 0187dc783..dcd519127 100644 --- a/src/encoding/delta_bitpacked/decoder.rs +++ b/src/encoding/delta_bitpacked/decoder.rs @@ -1,6 +1,6 @@ use crate::encoding::ceil8; -use super::super::bitpacking; +use super::super::bitpacked; use super::super::uleb128; use super::super::zigzag_leb128; @@ -15,7 +15,7 @@ struct Block<'a> { remaining: usize, // number of elements current_index: usize, // invariant: < values_per_mini_block // None represents a relative delta of zero, in which case there is no miniblock. - current_miniblock: Option>, + current_miniblock: Option>, // number of bytes consumed. consumed_bytes: usize, } @@ -57,19 +57,19 @@ impl<'a> Block<'a> { } fn advance_miniblock(&mut self) { - let num_bits = self.bitwidths[0]; + let num_bits = self.bitwidths[0] as usize; self.bitwidths = &self.bitwidths[1..]; self.current_miniblock = if num_bits > 0 { let length = std::cmp::min(self.remaining, self.values_per_mini_block); - let miniblock_length = ceil8(self.values_per_mini_block * num_bits as usize); + let miniblock_length = ceil8(self.values_per_mini_block * num_bits); let (miniblock, remainder) = self.values.split_at(miniblock_length); self.values = remainder; self.consumed_bytes += miniblock_length; - Some(bitpacking::Decoder::new(miniblock, num_bits, length)) + Some(bitpacked::Decoder::new(miniblock, num_bits, length)) } else { None }; diff --git a/src/encoding/delta_bitpacked/encoder.rs b/src/encoding/delta_bitpacked/encoder.rs index 541cc1301..3d0fd570f 100644 --- a/src/encoding/delta_bitpacked/encoder.rs +++ b/src/encoding/delta_bitpacked/encoder.rs @@ -1,6 +1,6 @@ use crate::encoding::ceil8; -use super::super::bitpacking; +use super::super::bitpacked; use super::super::uleb128; use super::super::zigzag_leb128; @@ -53,16 +53,17 @@ pub fn encode>(mut iterator: I, buffer: &mut Vec) { let num_bits = 64 - (max_delta - min_delta).leading_zeros(); buffer.push(num_bits as u8); + let num_bits = num_bits as usize; if num_bits > 0 { let start = buffer.len(); // bitpack encode all (deltas.len = 128 which is a multiple of 32) - let bytes_needed = start + ceil8(deltas.len() * num_bits as usize); + let bytes_needed = start + ceil8(deltas.len() * num_bits); buffer.resize(bytes_needed, 0); - bitpacking::encode(deltas.as_ref(), num_bits as u8, &mut buffer[start..]); + bitpacked::encode(deltas.as_ref(), num_bits, &mut buffer[start..]); - let bytes_needed = start + ceil8(deltas.len() * num_bits as usize); + let bytes_needed = start + ceil8(deltas.len() * num_bits); buffer.truncate(bytes_needed); } diff --git a/src/encoding/delta_bitpacked/mod.rs b/src/encoding/delta_bitpacked/mod.rs index a245b03d3..9d2f55f44 100644 --- a/src/encoding/delta_bitpacked/mod.rs +++ b/src/encoding/delta_bitpacked/mod.rs @@ -32,6 +32,7 @@ mod tests { assert_eq!(result, data); } + #[test] fn some() { let data = vec![ -2147483648, diff --git a/src/encoding/hybrid_rle/decoder.rs b/src/encoding/hybrid_rle/decoder.rs index bdd4b5081..f947cfe18 100644 --- a/src/encoding/hybrid_rle/decoder.rs +++ b/src/encoding/hybrid_rle/decoder.rs @@ -5,17 +5,17 @@ use super::{super::ceil8, HybridEncoded}; #[derive(Debug, Clone)] pub struct Decoder<'a> { values: &'a [u8], - num_bits: u32, + num_bits: usize, } impl<'a> Decoder<'a> { - pub fn new(values: &'a [u8], num_bits: u32) -> Self { + pub fn new(values: &'a [u8], num_bits: usize) -> Self { Self { values, num_bits } } /// Returns the number of bits being used by this decoder. #[inline] - pub fn num_bits(&self) -> u32 { + pub fn num_bits(&self) -> usize { self.num_bits } } @@ -35,7 +35,7 @@ impl<'a> Iterator for Decoder<'a> { }; if indicator & 1 == 1 { // is bitpacking - let bytes = (indicator as usize >> 1) * self.num_bits as usize; + let bytes = (indicator as usize >> 1) * self.num_bits; let bytes = std::cmp::min(bytes, self.values.len()); let (result, remaining) = self.values.split_at(bytes); let result = Some(HybridEncoded::Bitpacked(result)); @@ -45,7 +45,7 @@ impl<'a> Iterator for Decoder<'a> { // is rle let run_length = indicator as usize >> 1; // repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) - let rle_bytes = ceil8(self.num_bits as usize); + let rle_bytes = ceil8(self.num_bits); let (result, remaining) = self.values.split_at(rle_bytes); let result = Some(HybridEncoded::Rle(result, run_length)); self.values = remaining; @@ -58,11 +58,11 @@ impl<'a> Iterator for Decoder<'a> { mod tests { use super::*; - use super::super::super::bitpacking; + use super::super::super::bitpacked; #[test] fn basics_1() { - let bit_width = 1; + let bit_width = 1usize; let length = 5; let values = vec![ 2, 0, 0, 0, // length @@ -76,7 +76,7 @@ mod tests { if let HybridEncoded::Bitpacked(values) = run { assert_eq!(values, &[0b00001011]); let result = - bitpacking::Decoder::new(values, bit_width as u8, length).collect::>(); + bitpacked::Decoder::::new(values, bit_width, length).collect::>(); assert_eq!(result, &[1, 1, 0, 1, 0]); } else { panic!() @@ -100,7 +100,7 @@ mod tests { if let HybridEncoded::Bitpacked(values) = run { assert_eq!(values, &[0b11101011, 0b00000010]); - let result = bitpacking::Decoder::new(values, bit_width as u8, 10).collect::>(); + let result = bitpacked::Decoder::::new(values, bit_width, 10).collect::>(); assert_eq!(result, expected); } else { panic!() diff --git a/src/encoding/hybrid_rle/encoder.rs b/src/encoding/hybrid_rle/encoder.rs index 584f98462..97c38863d 100644 --- a/src/encoding/hybrid_rle/encoder.rs +++ b/src/encoding/hybrid_rle/encoder.rs @@ -1,4 +1,4 @@ -use crate::encoding::bitpacking; +use crate::encoding::bitpacked; use crate::encoding::{ceil8, uleb128}; use std::io::Write; @@ -9,8 +9,9 @@ use super::bitpacked_encode; pub fn encode_u32>( writer: &mut W, iterator: I, - num_bits: u8, + num_bits: u32, ) -> std::io::Result<()> { + let num_bits = num_bits as u8; // the length of the iterator. let length = iterator.size_hint().1.unwrap(); @@ -22,44 +23,50 @@ pub fn encode_u32>( let used = uleb128::encode(header, &mut container); writer.write_all(&container[..used])?; - bitpacked_encode_u32(writer, iterator, num_bits)?; + bitpacked_encode_u32(writer, iterator, num_bits as usize)?; Ok(()) } +const U32_BLOCK_LEN: usize = 32; + fn bitpacked_encode_u32>( writer: &mut W, mut iterator: I, - num_bits: u8, + num_bits: usize, ) -> std::io::Result<()> { // the length of the iterator. let length = iterator.size_hint().1.unwrap(); - let chunks = length / bitpacking::BLOCK_LEN; - let remainder = length - chunks * bitpacking::BLOCK_LEN; - let mut buffer = [0u32; bitpacking::BLOCK_LEN]; + let chunks = length / U32_BLOCK_LEN; + let remainder = length - chunks * U32_BLOCK_LEN; + let mut buffer = [0u32; U32_BLOCK_LEN]; - let compressed_chunk_size = ceil8(bitpacking::BLOCK_LEN * num_bits as usize); - // this is the upper bound: we do not know `num_bits` at compile time and thus can't allocate (on the stack) - // the exact length. - let mut compressed_chunk = [0u8; 4 * bitpacking::BLOCK_LEN]; + let compressed_chunk_size = ceil8(U32_BLOCK_LEN * num_bits); for _ in 0..chunks { - (0..bitpacking::BLOCK_LEN).for_each(|i| { - // infalible by construction - buffer[i] = iterator.next().unwrap() - }); - bitpacking::encode_pack(buffer, num_bits, compressed_chunk.as_mut()); - writer.write_all(&compressed_chunk[..compressed_chunk_size])?; + iterator + .by_ref() + .take(U32_BLOCK_LEN) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_chunk_size])?; } if remainder != 0 { - iterator.enumerate().for_each(|(i, x)| { - buffer[i] = x; - }); let compressed_remainder_size = ceil8(remainder * num_bits as usize); - bitpacking::encode_pack(buffer, num_bits, compressed_chunk.as_mut()); - writer.write_all(&compressed_chunk[..compressed_remainder_size])?; + iterator + .by_ref() + .take(remainder) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_remainder_size])?; }; Ok(()) } diff --git a/src/encoding/hybrid_rle/mod.rs b/src/encoding/hybrid_rle/mod.rs index dbfbf94e9..f62cc2505 100644 --- a/src/encoding/hybrid_rle/mod.rs +++ b/src/encoding/hybrid_rle/mod.rs @@ -6,7 +6,7 @@ pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use decoder::Decoder; pub use encoder::{encode_bool, encode_u32}; -use super::bitpacking; +use super::bitpacked; /// The two possible states of an RLE-encoded run. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -21,7 +21,7 @@ pub enum HybridEncoded<'a> { #[derive(Debug, Clone)] enum State<'a> { None, - Bitpacked(bitpacking::Decoder<'a>), + Bitpacked(bitpacked::Decoder<'a, u32>), Rle(std::iter::Take>), } @@ -42,8 +42,8 @@ fn read_next<'a, 'b>(decoder: &'b mut Decoder<'a>, remaining: usize) -> State<'a match decoder.next() { Some(HybridEncoded::Bitpacked(packed)) => { let num_bits = decoder.num_bits(); - let length = std::cmp::min(packed.len() * 8 / num_bits as usize, remaining); - let decoder = bitpacking::Decoder::new(packed, num_bits as u8, length); + let length = std::cmp::min(packed.len() * 8 / num_bits, remaining); + let decoder = bitpacked::Decoder::::new(packed, num_bits, length); State::Bitpacked(decoder) } Some(HybridEncoded::Rle(pack, additional)) => { @@ -61,6 +61,7 @@ fn read_next<'a, 'b>(decoder: &'b mut Decoder<'a>, remaining: usize) -> State<'a impl<'a> HybridRleDecoder<'a> { /// Returns a new [`HybridRleDecoder`] pub fn new(data: &'a [u8], num_bits: u32, num_values: usize) -> Self { + let num_bits = num_bits as usize; let mut decoder = Decoder::new(data, num_bits); let state = read_next(&mut decoder, num_values); Self { @@ -106,13 +107,13 @@ mod tests { #[test] fn roundtrip() { let mut buffer = vec![]; - let num_bits = 10; + let num_bits = 10u32; let data = (0..1000).collect::>(); encode_u32(&mut buffer, data.iter().cloned(), num_bits).unwrap(); - let decoder = HybridRleDecoder::new(&buffer, num_bits as u32, data.len()); + let decoder = HybridRleDecoder::new(&buffer, num_bits, data.len()); let result = decoder.collect::>(); @@ -195,7 +196,7 @@ mod tests { ]; let num_bits = 10; - let decoder = HybridRleDecoder::new(&data, num_bits as u32, 1000); + let decoder = HybridRleDecoder::new(&data, num_bits, 1000); let result = decoder.collect::>(); @@ -208,7 +209,7 @@ mod tests { let num_bits = 3; - let decoder = HybridRleDecoder::new(&data, num_bits as u32, 1); + let decoder = HybridRleDecoder::new(&data, num_bits, 1); let result = decoder.collect::>(); @@ -221,7 +222,7 @@ mod tests { let num_bits = 0; - let decoder = HybridRleDecoder::new(&data, num_bits as u32, 2); + let decoder = HybridRleDecoder::new(&data, num_bits, 2); let result = decoder.collect::>(); @@ -234,7 +235,7 @@ mod tests { let num_bits = 1; - let decoder = HybridRleDecoder::new(&data, num_bits as u32, 100); + let decoder = HybridRleDecoder::new(&data, num_bits, 100); let result = decoder.collect::>(); diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index 928eca936..fa65956d0 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -1,6 +1,6 @@ use std::convert::TryInto; -pub mod bitpacking; +pub mod bitpacked; pub mod delta_bitpacked; pub mod delta_byte_array; pub mod delta_length_byte_array; diff --git a/src/write/file.rs b/src/write/file.rs index 4b7585423..ade6986c6 100644 --- a/src/write/file.rs +++ b/src/write/file.rs @@ -273,7 +273,6 @@ mod tests { // read it again: let result = read_metadata(&mut Cursor::new(a)); - println!("{result:?}"); assert!(result.is_ok()); Ok(()) diff --git a/tests/it/read/primitive_nested.rs b/tests/it/read/primitive_nested.rs index ece0fe28d..14b26ff40 100644 --- a/tests/it/read/primitive_nested.rs +++ b/tests/it/read/primitive_nested.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use super::{dictionary::PrimitivePageDict, Array}; use parquet2::{ - encoding::{bitpacking, hybrid_rle::HybridRleDecoder, uleb128, Encoding}, + encoding::{bitpacked, hybrid_rle::HybridRleDecoder, uleb128, Encoding}, error::{Error, Result}, page::{split_buffer, DataPage}, read::levels::get_bit_width, @@ -179,7 +179,7 @@ fn read_dict_array( let (_, consumed) = uleb128::decode(values); let values = &values[consumed..]; - let indices = bitpacking::Decoder::new(values, bit_width, length as usize); + let indices = bitpacked::Decoder::::new(values, bit_width as usize, length as usize); let values = indices.map(|id| dict_values[id as usize]);