Skip to content

Commit

Permalink
Added bloom filter (jorgecarleitao#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and dantengsky committed Apr 1, 2022
1 parent 8de1c3f commit 2dfce10
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ flate2 = { version = "^1.0", optional = true }
lz4 = { version = "1", optional = true }
zstd = { version = "^0.11", optional = true, default-features = false }

xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] }

[features]
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "stream"]
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "stream", "bloom_filter"]
snappy = ["snap"]
gzip = ["flate2"]
stream = ["futures", "async-stream"]
bloom_filter = ["xxhash-rust"]
31 changes: 31 additions & 0 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use parquet2::error::Result;

// ANCHOR: deserialize
use parquet2::bloom_filter;
use parquet2::encoding::Encoding;
use parquet2::metadata::ColumnDescriptor;
use parquet2::page::{split_buffer, DataPage};
Expand Down Expand Up @@ -74,11 +75,40 @@ fn main() -> Result<()> {
let _max: i32 = stats.max_value.unwrap();
let _null_count: i64 = stats.null_count.unwrap();
}
PhysicalType::Int64 => {
let stats = stats
.as_any()
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap();
let _min: i64 = stats.min_value.unwrap();
let _max: i64 = stats.max_value.unwrap();
let _null_count: i64 = stats.null_count.unwrap();
}
_ => todo!(),
}
}
// ANCHOR_END: statistics

// ANCHOR: bloom_filter
let mut bitset = vec![];
bloom_filter::read(column_metadata, &mut reader, &mut bitset)?;
if !bitset.is_empty() {
// there is a bitset, we can use it to check if elements are in the column chunk

// assume that our query engine had resulted in the filter `"column 0" == 100i64` (it also verified that column 0 is i64 in parquet)
let value = 100i64;

// we hash this value
let hash = bloom_filter::hash_native(value);

// and check if the hash is in the bitset.
let _in_set = bloom_filter::is_in_set(&bitset, hash);
// if not (false), we could skip this entire row group, because no item hits the filter
// this can naturally be applied over multiple columns.
// if yes (true), the item _may_ be in the row group, and we usually can't skip it.
}
// ANCHOR_END: bloom_filter

// ANCHOR: pages
use parquet2::read::get_page_iterator;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![])?;
Expand All @@ -93,5 +123,6 @@ fn main() -> Result<()> {
let _array = deserialize(&page, column_metadata.descriptor());
}
// ANCHOR_END: decompress

Ok(())
}
11 changes: 11 additions & 0 deletions guide/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,14 @@ which can be downcasted via its `Statistics::physical_type()`:
```rust,no_run,noplayground
{{#include ../../examples/read_metadata.rs:statistics}}
```

## Bloom filters

The metadata of columns can contain bloom filter bitsets that
can be used to pushdown filter operations.

This crate offers the necessary functionality to check whether an item is not in a column chunk:

```rust,no_run,noplayground
{{#include ../../examples/read_metadata.rs:bloom_filter}}
```
17 changes: 17 additions & 0 deletions src/bloom_filter/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use xxhash_rust::xxh64::xxh64;

use crate::types::NativeType;

const SEED: u64 = 0;

/// (xxh64) hash of a [`NativeType`].
#[inline]
pub fn hash_native<T: NativeType>(value: T) -> u64 {
xxh64(value.to_le_bytes().as_ref(), SEED)
}

/// (xxh64) hash of a sequence of bytes (e.g. ByteArray).
#[inline]
pub fn hash_byte<A: AsRef<[u8]>>(value: A) -> u64 {
xxh64(value.as_ref(), SEED)
}
71 changes: 71 additions & 0 deletions src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! API to read and use bloom filters
mod hash;
mod read;
mod split_block;

pub use hash::{hash_byte, hash_native};
pub use read::read;
pub use split_block::{insert, is_in_set};

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn basics() {
let mut bitset = vec![0; 32];

// insert
for a in 0..10i64 {
let hash = hash_native(a);
insert(&mut bitset, hash);
}

// bloom filter produced by parquet-mr/spark for a column of i64 (0..=10)
/*
import pyspark.sql // 3.2.1
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.conf.set("parquet.bloom.filter.enabled", True)
spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
spark.conf.set("parquet.bloom.filter.max.bytes", 32)
data = [(i % 10,) for i in range(100)]
df = spark.createDataFrame(data, ["id"]).repartition(1)
df.write.parquet("bla.parquet", mode = "overwrite")
*/
let expected: &[u8] = &[
24, 130, 24, 8, 134, 8, 68, 6, 2, 101, 128, 10, 64, 2, 38, 78, 114, 1, 64, 38, 1, 192,
194, 152, 64, 70, 0, 36, 56, 121, 64, 0,
];
assert_eq!(bitset, expected);

// check
for a in 0..11i64 {
let hash = hash_native(a);

let valid = is_in_set(&bitset, hash);

assert_eq!(a < 10, valid);
}
}

#[test]
fn binary() {
let mut bitset = vec![0; 32];

// insert
for a in 0..10i64 {
let value = format!("a{}", a);
let hash = hash_byte(value);
insert(&mut bitset, hash);
}

// bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
let expected: &[u8] = &[
200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
];
assert_eq!(bitset, expected);
}
}
54 changes: 54 additions & 0 deletions src/bloom_filter/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::io::{Read, Seek, SeekFrom};

use parquet_format_async_temp::{
thrift::protocol::TCompactInputProtocol, BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHeader, SplitBlockAlgorithm, Uncompressed,
};

use crate::{error::ParquetError, metadata::ColumnChunkMetaData};

/// Reads the bloom filter associated to [`ColumnChunkMetaData`] into `bitset`.
/// Results in an empty `bitset` if there is no associated bloom filter or the algorithm is not supported.
/// # Error
/// Errors if the column contains no metadata or the filter can't be read or deserialized.
pub fn read<R: Read + Seek>(
column_metadata: &ColumnChunkMetaData,
mut reader: &mut R,
bitset: &mut Vec<u8>,
) -> Result<(), ParquetError> {
let offset = column_metadata
.metadata()
.ok_or_else(|| ParquetError::OutOfSpec("Column metadata is required".to_string()))?
.bloom_filter_offset;

let offset = if let Some(offset) = offset {
offset as u64
} else {
bitset.clear();
return Ok(());
};
reader.seek(SeekFrom::Start(offset))?;

// deserialize header
let mut prot = TCompactInputProtocol::new(&mut reader);
let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;

if header.algorithm != BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) {
bitset.clear();
return Ok(());
}
if header.compression != BloomFilterCompression::UNCOMPRESSED(Uncompressed {}) {
bitset.clear();
return Ok(());
}
// read bitset
if header.num_bytes as usize > bitset.capacity() {
*bitset = vec![0; header.num_bytes as usize]
} else {
bitset.clear();
bitset.resize(header.num_bytes as usize, 0); // populate with zeros
}

reader.read_exact(bitset)?;
Ok(())
}
82 changes: 82 additions & 0 deletions src/bloom_filter/split_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::convert::TryInto;

/// magic numbers taken from https://github.com/apache/parquet-format/blob/master/BloomFilter.md
const SALT: [u32; 8] = [
1203114875, 1150766481, 2284105051, 2729912477, 1884591559, 770785867, 2667333959, 1550580529,
];

fn hash_to_block_index(hash: u64, len: usize) -> usize {
let number_of_blocks = len as u64 / 32;
let low_hash = hash >> 32;
let block_index = ((low_hash * number_of_blocks) >> 32) as u32;
block_index as usize
}

fn new_mask(x: u32) -> [u32; 8] {
let mut a = [0u32; 8];
for i in 0..8 {
let mask = x.wrapping_mul(SALT[i]);
let mask = mask >> 27;
let mask = 0x1 << mask;
a[i] = mask;
}
a
}

/// loads a block from the bitset to the stack
#[inline]
fn load_block(bitset: &[u8]) -> [u32; 8] {
let mut a = [0u32; 8];
let bitset = bitset.chunks_exact(4).take(8);
for (a, chunk) in a.iter_mut().zip(bitset) {
*a = u32::from_le_bytes(chunk.try_into().unwrap())
}
a
}

/// assigns a block from the stack to `bitset`
#[inline]
fn unload_block(block: [u32; 8], bitset: &mut [u8]) {
let bitset = bitset.chunks_exact_mut(4).take(8);
for (a, chunk) in block.iter().zip(bitset) {
let a = a.to_le_bytes();
chunk[0] = a[0];
chunk[1] = a[1];
chunk[2] = a[2];
chunk[3] = a[3];
}
}

/// Returns whether the `hash` is in the set
pub fn is_in_set(bitset: &[u8], hash: u64) -> bool {
let block_index = hash_to_block_index(hash, bitset.len());
let key = hash as u32;

let mask = new_mask(key);
let slice = &bitset[block_index * 32..(block_index + 1) * 32];
let block_mask = load_block(slice);

for i in 0..8 {
if mask[i] & block_mask[i] == 0 {
return false;
}
}
true
}

/// Inserts a new hash to the set
pub fn insert(bitset: &mut [u8], hash: u64) {
let block_index = hash_to_block_index(hash, bitset.len());
let key = hash as u32;

let mask = new_mask(key);
let slice = &bitset[block_index * 32..(block_index + 1) * 32];
let mut block_mask = load_block(slice);

for i in 0..8 {
block_mask[i] |= mask[i];

let mut_slice = &mut bitset[block_index * 32..(block_index + 1) * 32];
unload_block(block_mask, mut_slice)
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#[macro_use]
pub mod error;
#[cfg(feature = "bloom_filter")]
pub mod bloom_filter;
pub mod compression;
pub mod encoding;
pub mod metadata;
Expand Down

0 comments on commit 2dfce10

Please sign in to comment.