Skip to content

Commit

Permalink
#15 implement key expiration and cache eviction policy (#16)
Browse files Browse the repository at this point in the history
* Added support for key expiration

* Added key expiration and ttl support

* Removed debug lines

* clippy
  • Loading branch information
ezrasingh authored Aug 16, 2024
1 parent 0179c15 commit e67e217
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 74 deletions.
6 changes: 3 additions & 3 deletions geoprox-core/benches/spatial_index_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn random_query(rng: &mut ThreadRng) -> (LatLngCoord, f64) {
fn seed_index(size: &i32, rng: &mut ThreadRng) -> SpatialIndex {
let mut geo_index = SpatialIndex::new(*size as usize);
(0..*size).for_each(|n| {
geo_index.insert(&n.to_string(), &random_geohash(rng));
geo_index.insert(&n.to_string(), &random_geohash(rng), None);
});
geo_index
}
Expand All @@ -60,7 +60,7 @@ fn insert_benchmark(c: &mut Criterion) {
|| (SpatialIndex::new(capacity as usize), data.clone()),
|(mut geo_index, data)| {
data.iter()
.for_each(|(key, ghash)| black_box(geo_index.insert(key, ghash)))
.for_each(|(key, ghash)| black_box(geo_index.insert(key, ghash, None)))
},
BatchSize::LargeInput,
);
Expand All @@ -73,7 +73,7 @@ fn insert_benchmark(c: &mut Criterion) {
|b, &capacity| {
b.iter_batched(
|| (SpatialIndex::new(capacity as usize), data.clone()),
|(mut geo_index, data)| black_box(geo_index.insert_many(data)),
|(mut geo_index, data)| black_box(geo_index.insert_many(data, None)),
BatchSize::LargeInput,
);
},
Expand Down
202 changes: 139 additions & 63 deletions geoprox-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use kiddo::KdTree;
use log::debug;
use patricia_tree::StringPatriciaMap;
use rayon::prelude::*;
use std::collections::BTreeSet;
use std::hash::{BuildHasher, BuildHasherDefault};
use std::time::{Duration, Instant};

#[inline]
fn build_search_space(
Expand Down Expand Up @@ -38,10 +40,12 @@ fn build_search_space(
pub struct SpatialIndex {
/// maps geohash to common objects
prefix_tree: StringPatriciaMap<HashSet<ObjectIdentifier>>,
/// maps object key, geohash to hash id (object id)
position_map: HashTable<[String; 2]>,
/// maps object key, geohash and expiration to hash id (object id)
position_map: HashTable<(String, String, Option<Instant>)>,
/// internal hasher
hasher: BuildHasherDefault<AHasher>,
/// maps expiration key to object key
expirations: BTreeSet<(Instant, String)>,
}

impl SpatialIndex {
Expand All @@ -62,16 +66,28 @@ impl SpatialIndex {
}
}

/// Removes expired keys
pub fn purge(&mut self) {
let now = (Instant::now(), String::new());
let expired_keys: HashSet<String> = self
.expirations
.range(..=&now)
.map(|(_, key)| key.to_owned())
.collect();
self.remove_many(expired_keys);
self.expirations = self.expirations.split_off(&now);
}

/// Insert key into index at some geographical location
pub fn insert(&mut self, key: &str, ghash: &str) {
pub fn insert(&mut self, key: &str, ghash: &str, expiration: Option<Duration>) {
let id: ObjectIdentifier = self.hasher.hash_one(key);
// ? remove object from previous locations
if let Entry::Occupied(entry) = self.position_map.entry(
id,
|[key, _]| key.eq(key),
|[key, _]| self.hasher.hash_one(key),
|(key, _, _)| key.eq(key),
|(key, _, _)| self.hasher.hash_one(key),
) {
let ([_, old_ghash], _) = entry.remove();
let ((key, old_ghash, old_expiration), _) = entry.remove();
debug!(
"removing object from previous ghash: id={} geohash={}",
id, &old_ghash
Expand All @@ -81,14 +97,31 @@ impl SpatialIndex {
} else {
self.prefix_tree.remove(old_ghash);
}
if let Some(expire_key) = old_expiration {
debug!("removing old expiration for: id={}", id);
self.expirations.remove(&(expire_key, key));
}
}

// ? update position_map & prefix_tree
debug!("storing object: id={} key={}", id, key);
if let Some(duration) = expiration {
// ? update expiration tree
let expire_at = Instant::now() + duration;
self.position_map.insert_unique(
id,
(key.to_owned(), ghash.into(), Some(expire_at)),
|(key, _, _)| self.hasher.hash_one(key),
);
self.expirations.insert((expire_at, key.to_string()));
} else {
self.position_map.insert_unique(
id,
(key.to_owned(), ghash.into(), None),
|(key, _, _)| self.hasher.hash_one(key),
);
}

self.position_map
.insert_unique(id, [key.to_owned(), ghash.into()], |[key, _]| {
self.hasher.hash_one(key)
});
// ? insert current region into prefix tree
if let Some(members) = self.prefix_tree.get_mut(ghash) {
members.insert(id);
Expand All @@ -98,27 +131,34 @@ impl SpatialIndex {
}

/// insert multiple objects at once
pub fn insert_many(&mut self, objects: impl IntoIterator<Item = (String, String)>) {
pub fn insert_many(
&mut self,
objects: impl IntoIterator<Item = (String, String)>,
expiration: Option<Duration>,
) {
objects
.into_iter()
.for_each(|(key, ghash)| self.insert(&key, &ghash));
.for_each(|(key, ghash)| self.insert(&key, &ghash, expiration));
}

/// Remove key from index
pub fn remove(&mut self, key: &str) -> bool {
let id: ObjectIdentifier = self.hasher.hash_one(key);
if let Entry::Occupied(entry) = self.position_map.entry(
id,
|[key, _]| key.eq(key),
|[key, _]| self.hasher.hash_one(key),
|(key, _, _)| key.eq(key),
|(key, _, _)| self.hasher.hash_one(key),
) {
let ([_, ghash], _) = entry.remove();
let ((key, ghash, old_expiration), _) = entry.remove();
if let Some(members) = self.prefix_tree.get_mut(&ghash) {
members.remove(&id);
if members.is_empty() {
self.prefix_tree.remove(&ghash);
}
}
if let Some(expire_key) = old_expiration {
self.expirations.remove(&(expire_key, key));
}
}
true
}
Expand Down Expand Up @@ -162,10 +202,10 @@ impl SpatialIndex {
.par_iter()
.filter_map(|node| {
self.position_map
.find(node.item, |[key, _]| {
.find(node.item, |(key, _, _)| {
self.hasher.hash_one(key).eq(&node.item)
})
.map(|[key, _]| Neighbor {
.map(|(key, _, _)| Neighbor {
distance: node.distance,
key: key.to_owned(),
})
Expand Down Expand Up @@ -195,23 +235,23 @@ mod test {
let key = "test-key";

// place key within the search area
geo_index.insert(key, &encode_lat_lng(origin, insert_depth));
geo_index.insert(key, &encode_lat_lng(origin, insert_depth), None);

let res = geo_index
.search(origin, range, 100, false, DEFAULT_DEPTH)
.unwrap();
assert_eq!(res.len(), 1);

// move key out of search area
geo_index.insert(key, &encode_lat_lng([-70.0, 100.0], insert_depth));
geo_index.insert(key, &encode_lat_lng([-70.0, 100.0], insert_depth), None);

let res = geo_index
.search(origin, range, 100, false, DEFAULT_DEPTH)
.unwrap();
assert_eq!(res.len(), 0);

// move key back into search area
geo_index.insert(key, &encode_lat_lng(origin, insert_depth));
geo_index.insert(key, &encode_lat_lng(origin, insert_depth), None);

let res = geo_index
.search(origin, range, 100, false, DEFAULT_DEPTH)
Expand All @@ -226,8 +266,8 @@ mod test {
let range = 10.0;
let origin: LatLngCoord = [0.0, 0.0];

geo_index.insert(&"a", &encode_lat_lng(origin, depth));
geo_index.insert(&"b", &encode_lat_lng(origin, depth));
geo_index.insert(&"a", &encode_lat_lng(origin, depth), None);
geo_index.insert(&"b", &encode_lat_lng(origin, depth), None);

let res = geo_index
.search(origin, range, 100, false, DEFAULT_DEPTH)
Expand All @@ -252,16 +292,19 @@ mod test {
let sorted = false;
let origin: LatLngCoord = [0.0, 0.0];

geo_index.insert_many(vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
]);
geo_index.insert_many(
vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
],
None,
);

let res = geo_index
.search(origin, range, count, sorted, DEFAULT_DEPTH)
Expand All @@ -281,16 +324,19 @@ mod test {
let sorted = false;
let origin: LatLngCoord = [0.0, 0.0];

geo_index.insert_many(vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
]);
geo_index.insert_many(
vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
],
None,
);

let res = geo_index
.search(origin, range, count, sorted, DEFAULT_DEPTH)
Expand All @@ -306,16 +352,19 @@ mod test {
let range = 1000.0;
let origin: LatLngCoord = [0.0, 0.0];

geo_index.insert_many(vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
]);
geo_index.insert_many(
vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
],
None,
);

let count = 1;

Expand Down Expand Up @@ -351,16 +400,19 @@ mod test {
let sorted = true;
let origin: LatLngCoord = [0.0, 0.0];

geo_index.insert_many(vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
]);
geo_index.insert_many(
vec![
("a".to_string(), encode_lat_lng([1.0, 0.0], depth)),
("b".to_string(), encode_lat_lng([1.0, 1.0], depth)),
("c".to_string(), encode_lat_lng([0.0, 1.0], depth)),
("d".to_string(), encode_lat_lng([0.0, 0.0], depth)),
("e".to_string(), encode_lat_lng([-1., 0.0], depth)),
("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)),
("g".to_string(), encode_lat_lng([0.0, -1.0], depth)),
("h".to_string(), encode_lat_lng([0.0, 0.0], depth)),
],
None,
);

let res = geo_index
.search(origin, range, count, sorted, DEFAULT_DEPTH)
Expand All @@ -383,7 +435,7 @@ mod test {

for n in 0..capacity {
let (lat, lng) = (rng.gen_range(-90f64..90f64), rng.gen_range(-180f64..180f64));
geo_index.insert(&n.to_string(), &encode_lat_lng([lat, lng], depth));
geo_index.insert(&n.to_string(), &encode_lat_lng([lat, lng], depth), None);
}

let center = [0f64, 0f64];
Expand All @@ -397,4 +449,28 @@ mod test {
assert!(neighbor.distance <= range);
});
}

#[test]
fn can_purge() {
let mut geo_index = SpatialIndex::default();
let depth: usize = 6;
let range = 1000.0;
let count = 2;
let sorted = false;
let origin: LatLngCoord = [0.0, 0.0];

let duration = Duration::from_millis(500);

geo_index.insert(&"a", &encode_lat_lng(origin, depth), Some(duration));
geo_index.insert(&"b", &encode_lat_lng(origin, depth), Some(duration));

std::thread::sleep(duration);

geo_index.purge();

let res = geo_index
.search(origin, range, count, sorted, DEFAULT_DEPTH)
.unwrap();
assert_eq!(res.len(), 0);
}
}
2 changes: 1 addition & 1 deletion geoprox-core/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub type LatLngCoord = [f64; 2];

/// Nearby object
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[derive(Debug, Serialize, Clone, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Neighbor {
/// Distance in kilometers
pub distance: f64,
Expand Down
Loading

0 comments on commit e67e217

Please sign in to comment.