From 2dfce10d97b85804df88e7acbb25b85ca6073e6b Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 16 Mar 2022 22:52:41 +0100 Subject: [PATCH] Added bloom filter (#99) --- Cargo.toml | 5 +- examples/read_metadata.rs | 31 +++++++++++++ guide/src/README.md | 11 +++++ src/bloom_filter/hash.rs | 17 +++++++ src/bloom_filter/mod.rs | 71 ++++++++++++++++++++++++++++ src/bloom_filter/read.rs | 54 ++++++++++++++++++++++ src/bloom_filter/split_block.rs | 82 +++++++++++++++++++++++++++++++++ src/lib.rs | 2 + 8 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 src/bloom_filter/hash.rs create mode 100644 src/bloom_filter/mod.rs create mode 100644 src/bloom_filter/read.rs create mode 100644 src/bloom_filter/split_block.rs diff --git a/Cargo.toml b/Cargo.toml index 957762ec9..a9ef0d204 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,11 @@ flate2 = { version = "^1.0", optional = true } lz4 = { version = "1", optional = true } zstd = { version = "^0.11", optional = true, default-features = false } +xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] } + [features] -default = ["snappy", "gzip", "lz4", "zstd", "brotli", "stream"] +default = ["snappy", "gzip", "lz4", "zstd", "brotli", "stream", "bloom_filter"] snappy = ["snap"] gzip = ["flate2"] stream = ["futures", "async-stream"] +bloom_filter = ["xxhash-rust"] diff --git a/examples/read_metadata.rs b/examples/read_metadata.rs index 8085abb6d..18aa8a819 100644 --- a/examples/read_metadata.rs +++ b/examples/read_metadata.rs @@ -1,6 +1,7 @@ use parquet2::error::Result; // ANCHOR: deserialize +use parquet2::bloom_filter; use parquet2::encoding::Encoding; use parquet2::metadata::ColumnDescriptor; use parquet2::page::{split_buffer, DataPage}; @@ -74,11 +75,40 @@ fn main() -> Result<()> { let _max: i32 = stats.max_value.unwrap(); let _null_count: i64 = stats.null_count.unwrap(); } + PhysicalType::Int64 => { + let stats = stats + .as_any() + .downcast_ref::>() + .unwrap(); + let _min: i64 = stats.min_value.unwrap(); + let _max: i64 = stats.max_value.unwrap(); + let _null_count: i64 = stats.null_count.unwrap(); + } _ => todo!(), } } // ANCHOR_END: statistics + // ANCHOR: bloom_filter + let mut bitset = vec![]; + bloom_filter::read(column_metadata, &mut reader, &mut bitset)?; + if !bitset.is_empty() { + // there is a bitset, we can use it to check if elements are in the column chunk + + // assume that our query engine had resulted in the filter `"column 0" == 100i64` (it also verified that column 0 is i64 in parquet) + let value = 100i64; + + // we hash this value + let hash = bloom_filter::hash_native(value); + + // and check if the hash is in the bitset. + let _in_set = bloom_filter::is_in_set(&bitset, hash); + // if not (false), we could skip this entire row group, because no item hits the filter + // this can naturally be applied over multiple columns. + // if yes (true), the item _may_ be in the row group, and we usually can't skip it. + } + // ANCHOR_END: bloom_filter + // ANCHOR: pages use parquet2::read::get_page_iterator; let pages = get_page_iterator(column_metadata, &mut reader, None, vec![])?; @@ -93,5 +123,6 @@ fn main() -> Result<()> { let _array = deserialize(&page, column_metadata.descriptor()); } // ANCHOR_END: decompress + Ok(()) } diff --git a/guide/src/README.md b/guide/src/README.md index b564cf819..2099940c0 100644 --- a/guide/src/README.md +++ b/guide/src/README.md @@ -110,3 +110,14 @@ which can be downcasted via its `Statistics::physical_type()`: ```rust,no_run,noplayground {{#include ../../examples/read_metadata.rs:statistics}} ``` + +## Bloom filters + +The metadata of columns can contain bloom filter bitsets that +can be used to pushdown filter operations. + +This crate offers the necessary functionality to check whether an item is not in a column chunk: + +```rust,no_run,noplayground +{{#include ../../examples/read_metadata.rs:bloom_filter}} +``` diff --git a/src/bloom_filter/hash.rs b/src/bloom_filter/hash.rs new file mode 100644 index 000000000..252c72bbf --- /dev/null +++ b/src/bloom_filter/hash.rs @@ -0,0 +1,17 @@ +use xxhash_rust::xxh64::xxh64; + +use crate::types::NativeType; + +const SEED: u64 = 0; + +/// (xxh64) hash of a [`NativeType`]. +#[inline] +pub fn hash_native(value: T) -> u64 { + xxh64(value.to_le_bytes().as_ref(), SEED) +} + +/// (xxh64) hash of a sequence of bytes (e.g. ByteArray). +#[inline] +pub fn hash_byte>(value: A) -> u64 { + xxh64(value.as_ref(), SEED) +} diff --git a/src/bloom_filter/mod.rs b/src/bloom_filter/mod.rs new file mode 100644 index 000000000..218715d7a --- /dev/null +++ b/src/bloom_filter/mod.rs @@ -0,0 +1,71 @@ +//! API to read and use bloom filters +mod hash; +mod read; +mod split_block; + +pub use hash::{hash_byte, hash_native}; +pub use read::read; +pub use split_block::{insert, is_in_set}; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basics() { + let mut bitset = vec![0; 32]; + + // insert + for a in 0..10i64 { + let hash = hash_native(a); + insert(&mut bitset, hash); + } + + // bloom filter produced by parquet-mr/spark for a column of i64 (0..=10) + /* + import pyspark.sql // 3.2.1 + spark = pyspark.sql.SparkSession.builder.getOrCreate() + spark.conf.set("parquet.bloom.filter.enabled", True) + spark.conf.set("parquet.bloom.filter.expected.ndv", 10) + spark.conf.set("parquet.bloom.filter.max.bytes", 32) + + data = [(i % 10,) for i in range(100)] + df = spark.createDataFrame(data, ["id"]).repartition(1) + + df.write.parquet("bla.parquet", mode = "overwrite") + */ + let expected: &[u8] = &[ + 24, 130, 24, 8, 134, 8, 68, 6, 2, 101, 128, 10, 64, 2, 38, 78, 114, 1, 64, 38, 1, 192, + 194, 152, 64, 70, 0, 36, 56, 121, 64, 0, + ]; + assert_eq!(bitset, expected); + + // check + for a in 0..11i64 { + let hash = hash_native(a); + + let valid = is_in_set(&bitset, hash); + + assert_eq!(a < 10, valid); + } + } + + #[test] + fn binary() { + let mut bitset = vec![0; 32]; + + // insert + for a in 0..10i64 { + let value = format!("a{}", a); + let hash = hash_byte(value); + insert(&mut bitset, hash); + } + + // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10 + let expected: &[u8] = &[ + 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5, + 99, 65, 2, 0, 224, 44, 64, 78, 96, 4, + ]; + assert_eq!(bitset, expected); + } +} diff --git a/src/bloom_filter/read.rs b/src/bloom_filter/read.rs new file mode 100644 index 000000000..e06a1eb97 --- /dev/null +++ b/src/bloom_filter/read.rs @@ -0,0 +1,54 @@ +use std::io::{Read, Seek, SeekFrom}; + +use parquet_format_async_temp::{ + thrift::protocol::TCompactInputProtocol, BloomFilterAlgorithm, BloomFilterCompression, + BloomFilterHeader, SplitBlockAlgorithm, Uncompressed, +}; + +use crate::{error::ParquetError, metadata::ColumnChunkMetaData}; + +/// Reads the bloom filter associated to [`ColumnChunkMetaData`] into `bitset`. +/// Results in an empty `bitset` if there is no associated bloom filter or the algorithm is not supported. +/// # Error +/// Errors if the column contains no metadata or the filter can't be read or deserialized. +pub fn read( + column_metadata: &ColumnChunkMetaData, + mut reader: &mut R, + bitset: &mut Vec, +) -> Result<(), ParquetError> { + let offset = column_metadata + .metadata() + .ok_or_else(|| ParquetError::OutOfSpec("Column metadata is required".to_string()))? + .bloom_filter_offset; + + let offset = if let Some(offset) = offset { + offset as u64 + } else { + bitset.clear(); + return Ok(()); + }; + reader.seek(SeekFrom::Start(offset))?; + + // deserialize header + let mut prot = TCompactInputProtocol::new(&mut reader); + let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?; + + if header.algorithm != BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) { + bitset.clear(); + return Ok(()); + } + if header.compression != BloomFilterCompression::UNCOMPRESSED(Uncompressed {}) { + bitset.clear(); + return Ok(()); + } + // read bitset + if header.num_bytes as usize > bitset.capacity() { + *bitset = vec![0; header.num_bytes as usize] + } else { + bitset.clear(); + bitset.resize(header.num_bytes as usize, 0); // populate with zeros + } + + reader.read_exact(bitset)?; + Ok(()) +} diff --git a/src/bloom_filter/split_block.rs b/src/bloom_filter/split_block.rs new file mode 100644 index 000000000..576f4d5f1 --- /dev/null +++ b/src/bloom_filter/split_block.rs @@ -0,0 +1,82 @@ +use std::convert::TryInto; + +/// magic numbers taken from https://github.com/apache/parquet-format/blob/master/BloomFilter.md +const SALT: [u32; 8] = [ + 1203114875, 1150766481, 2284105051, 2729912477, 1884591559, 770785867, 2667333959, 1550580529, +]; + +fn hash_to_block_index(hash: u64, len: usize) -> usize { + let number_of_blocks = len as u64 / 32; + let low_hash = hash >> 32; + let block_index = ((low_hash * number_of_blocks) >> 32) as u32; + block_index as usize +} + +fn new_mask(x: u32) -> [u32; 8] { + let mut a = [0u32; 8]; + for i in 0..8 { + let mask = x.wrapping_mul(SALT[i]); + let mask = mask >> 27; + let mask = 0x1 << mask; + a[i] = mask; + } + a +} + +/// loads a block from the bitset to the stack +#[inline] +fn load_block(bitset: &[u8]) -> [u32; 8] { + let mut a = [0u32; 8]; + let bitset = bitset.chunks_exact(4).take(8); + for (a, chunk) in a.iter_mut().zip(bitset) { + *a = u32::from_le_bytes(chunk.try_into().unwrap()) + } + a +} + +/// assigns a block from the stack to `bitset` +#[inline] +fn unload_block(block: [u32; 8], bitset: &mut [u8]) { + let bitset = bitset.chunks_exact_mut(4).take(8); + for (a, chunk) in block.iter().zip(bitset) { + let a = a.to_le_bytes(); + chunk[0] = a[0]; + chunk[1] = a[1]; + chunk[2] = a[2]; + chunk[3] = a[3]; + } +} + +/// Returns whether the `hash` is in the set +pub fn is_in_set(bitset: &[u8], hash: u64) -> bool { + let block_index = hash_to_block_index(hash, bitset.len()); + let key = hash as u32; + + let mask = new_mask(key); + let slice = &bitset[block_index * 32..(block_index + 1) * 32]; + let block_mask = load_block(slice); + + for i in 0..8 { + if mask[i] & block_mask[i] == 0 { + return false; + } + } + true +} + +/// Inserts a new hash to the set +pub fn insert(bitset: &mut [u8], hash: u64) { + let block_index = hash_to_block_index(hash, bitset.len()); + let key = hash as u32; + + let mask = new_mask(key); + let slice = &bitset[block_index * 32..(block_index + 1) * 32]; + let mut block_mask = load_block(slice); + + for i in 0..8 { + block_mask[i] |= mask[i]; + + let mut_slice = &mut bitset[block_index * 32..(block_index + 1) * 32]; + unload_block(block_mask, mut_slice) + } +} diff --git a/src/lib.rs b/src/lib.rs index bddd62b8c..11b7528e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,8 @@ #[macro_use] pub mod error; +#[cfg(feature = "bloom_filter")] +pub mod bloom_filter; pub mod compression; pub mod encoding; pub mod metadata;