Skip to content

Commit

Permalink
ConcurrentBloomInterval (solana-labs#34486)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Jan 2, 2024
1 parent 5349308 commit 6dea5a2
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion bloom/src/bloom.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
//! Simple Bloom Filter
use {
bv::BitVec,
fnv::FnvHasher,
rand::{self, Rng},
serde::{Deserialize, Serialize},
solana_sdk::sanitize::{Sanitize, SanitizeError},
solana_sdk::{
sanitize::{Sanitize, SanitizeError},
timing::AtomicInterval,
},
std::{
cmp, fmt,
hash::Hasher,
marker::PhantomData,
ops::Deref,
sync::atomic::{AtomicU64, Ordering},
},
};
Expand Down Expand Up @@ -228,6 +233,40 @@ impl<T: BloomHashIndex> From<ConcurrentBloom<T>> for Bloom<T> {
}
}

/// Wrapper around `ConcurrentBloom` and `AtomicInterval` so the bloom filter
/// can be cleared periodically.
pub struct ConcurrentBloomInterval<T: BloomHashIndex> {
interval: AtomicInterval,
bloom: ConcurrentBloom<T>,
}

// Directly allow all methods of `AtomicBloom` to be called on `AtomicBloomInterval`.
impl<T: BloomHashIndex> Deref for ConcurrentBloomInterval<T> {
type Target = ConcurrentBloom<T>;
fn deref(&self) -> &Self::Target {
&self.bloom
}
}

impl<T: BloomHashIndex> ConcurrentBloomInterval<T> {
/// Create a new filter with the given parameters.
/// See `Bloom::random` for details.
pub fn new(num_items: usize, false_positive_rate: f64, max_bits: usize) -> Self {
let bloom = Bloom::random(num_items, false_positive_rate, max_bits);
Self {
interval: AtomicInterval::default(),
bloom: ConcurrentBloom::from(bloom),
}
}

/// Reset the filter if the reset interval has elapsed.
pub fn maybe_reset(&self, reset_interval_ms: u64) {
if self.interval.should_update(reset_interval_ms) {
self.bloom.clear();
}
}
}

#[cfg(test)]
mod test {
use {
Expand Down

0 comments on commit 6dea5a2

Please sign in to comment.