From e67e217e58b84f2e1f8e30e0309dd9ea82cd5c21 Mon Sep 17 00:00:00 2001 From: Ezra Singh Date: Fri, 16 Aug 2024 01:06:35 -0400 Subject: [PATCH] #15 implement key expiration and cache eviction policy (#16) * Added support for key expiration * Added key expiration and ttl support * Removed debug lines * clippy --- .../benches/spatial_index_benchmark.rs | 6 +- geoprox-core/src/cache.rs | 202 ++++++++++++------ geoprox-core/src/models.rs | 2 +- geoprox-core/src/shard.rs | 20 +- geoprox-server/src/api.rs | 43 +++- geoprox-server/src/dto.rs | 6 +- geoprox-server/src/handlers.rs | 9 +- 7 files changed, 214 insertions(+), 74 deletions(-) diff --git a/geoprox-core/benches/spatial_index_benchmark.rs b/geoprox-core/benches/spatial_index_benchmark.rs index 0d4b90c..e150228 100644 --- a/geoprox-core/benches/spatial_index_benchmark.rs +++ b/geoprox-core/benches/spatial_index_benchmark.rs @@ -38,7 +38,7 @@ fn random_query(rng: &mut ThreadRng) -> (LatLngCoord, f64) { fn seed_index(size: &i32, rng: &mut ThreadRng) -> SpatialIndex { let mut geo_index = SpatialIndex::new(*size as usize); (0..*size).for_each(|n| { - geo_index.insert(&n.to_string(), &random_geohash(rng)); + geo_index.insert(&n.to_string(), &random_geohash(rng), None); }); geo_index } @@ -60,7 +60,7 @@ fn insert_benchmark(c: &mut Criterion) { || (SpatialIndex::new(capacity as usize), data.clone()), |(mut geo_index, data)| { data.iter() - .for_each(|(key, ghash)| black_box(geo_index.insert(key, ghash))) + .for_each(|(key, ghash)| black_box(geo_index.insert(key, ghash, None))) }, BatchSize::LargeInput, ); @@ -73,7 +73,7 @@ fn insert_benchmark(c: &mut Criterion) { |b, &capacity| { b.iter_batched( || (SpatialIndex::new(capacity as usize), data.clone()), - |(mut geo_index, data)| black_box(geo_index.insert_many(data)), + |(mut geo_index, data)| black_box(geo_index.insert_many(data, None)), BatchSize::LargeInput, ); }, diff --git a/geoprox-core/src/cache.rs b/geoprox-core/src/cache.rs index 537c971..7a5e1ed 100644 --- a/geoprox-core/src/cache.rs +++ b/geoprox-core/src/cache.rs @@ -9,7 +9,9 @@ use kiddo::KdTree; use log::debug; use patricia_tree::StringPatriciaMap; use rayon::prelude::*; +use std::collections::BTreeSet; use std::hash::{BuildHasher, BuildHasherDefault}; +use std::time::{Duration, Instant}; #[inline] fn build_search_space( @@ -38,10 +40,12 @@ fn build_search_space( pub struct SpatialIndex { /// maps geohash to common objects prefix_tree: StringPatriciaMap>, - /// maps object key, geohash to hash id (object id) - position_map: HashTable<[String; 2]>, + /// maps object key, geohash and expiration to hash id (object id) + position_map: HashTable<(String, String, Option)>, /// internal hasher hasher: BuildHasherDefault, + /// maps expiration key to object key + expirations: BTreeSet<(Instant, String)>, } impl SpatialIndex { @@ -62,16 +66,28 @@ impl SpatialIndex { } } + /// Removes expired keys + pub fn purge(&mut self) { + let now = (Instant::now(), String::new()); + let expired_keys: HashSet = self + .expirations + .range(..=&now) + .map(|(_, key)| key.to_owned()) + .collect(); + self.remove_many(expired_keys); + self.expirations = self.expirations.split_off(&now); + } + /// Insert key into index at some geographical location - pub fn insert(&mut self, key: &str, ghash: &str) { + pub fn insert(&mut self, key: &str, ghash: &str, expiration: Option) { let id: ObjectIdentifier = self.hasher.hash_one(key); // ? remove object from previous locations if let Entry::Occupied(entry) = self.position_map.entry( id, - |[key, _]| key.eq(key), - |[key, _]| self.hasher.hash_one(key), + |(key, _, _)| key.eq(key), + |(key, _, _)| self.hasher.hash_one(key), ) { - let ([_, old_ghash], _) = entry.remove(); + let ((key, old_ghash, old_expiration), _) = entry.remove(); debug!( "removing object from previous ghash: id={} geohash={}", id, &old_ghash @@ -81,14 +97,31 @@ impl SpatialIndex { } else { self.prefix_tree.remove(old_ghash); } + if let Some(expire_key) = old_expiration { + debug!("removing old expiration for: id={}", id); + self.expirations.remove(&(expire_key, key)); + } } + // ? update position_map & prefix_tree debug!("storing object: id={} key={}", id, key); + if let Some(duration) = expiration { + // ? update expiration tree + let expire_at = Instant::now() + duration; + self.position_map.insert_unique( + id, + (key.to_owned(), ghash.into(), Some(expire_at)), + |(key, _, _)| self.hasher.hash_one(key), + ); + self.expirations.insert((expire_at, key.to_string())); + } else { + self.position_map.insert_unique( + id, + (key.to_owned(), ghash.into(), None), + |(key, _, _)| self.hasher.hash_one(key), + ); + } - self.position_map - .insert_unique(id, [key.to_owned(), ghash.into()], |[key, _]| { - self.hasher.hash_one(key) - }); // ? insert current region into prefix tree if let Some(members) = self.prefix_tree.get_mut(ghash) { members.insert(id); @@ -98,10 +131,14 @@ impl SpatialIndex { } /// insert multiple objects at once - pub fn insert_many(&mut self, objects: impl IntoIterator) { + pub fn insert_many( + &mut self, + objects: impl IntoIterator, + expiration: Option, + ) { objects .into_iter() - .for_each(|(key, ghash)| self.insert(&key, &ghash)); + .for_each(|(key, ghash)| self.insert(&key, &ghash, expiration)); } /// Remove key from index @@ -109,16 +146,19 @@ impl SpatialIndex { let id: ObjectIdentifier = self.hasher.hash_one(key); if let Entry::Occupied(entry) = self.position_map.entry( id, - |[key, _]| key.eq(key), - |[key, _]| self.hasher.hash_one(key), + |(key, _, _)| key.eq(key), + |(key, _, _)| self.hasher.hash_one(key), ) { - let ([_, ghash], _) = entry.remove(); + let ((key, ghash, old_expiration), _) = entry.remove(); if let Some(members) = self.prefix_tree.get_mut(&ghash) { members.remove(&id); if members.is_empty() { self.prefix_tree.remove(&ghash); } } + if let Some(expire_key) = old_expiration { + self.expirations.remove(&(expire_key, key)); + } } true } @@ -162,10 +202,10 @@ impl SpatialIndex { .par_iter() .filter_map(|node| { self.position_map - .find(node.item, |[key, _]| { + .find(node.item, |(key, _, _)| { self.hasher.hash_one(key).eq(&node.item) }) - .map(|[key, _]| Neighbor { + .map(|(key, _, _)| Neighbor { distance: node.distance, key: key.to_owned(), }) @@ -195,7 +235,7 @@ mod test { let key = "test-key"; // place key within the search area - geo_index.insert(key, &encode_lat_lng(origin, insert_depth)); + geo_index.insert(key, &encode_lat_lng(origin, insert_depth), None); let res = geo_index .search(origin, range, 100, false, DEFAULT_DEPTH) @@ -203,7 +243,7 @@ mod test { assert_eq!(res.len(), 1); // move key out of search area - geo_index.insert(key, &encode_lat_lng([-70.0, 100.0], insert_depth)); + geo_index.insert(key, &encode_lat_lng([-70.0, 100.0], insert_depth), None); let res = geo_index .search(origin, range, 100, false, DEFAULT_DEPTH) @@ -211,7 +251,7 @@ mod test { assert_eq!(res.len(), 0); // move key back into search area - geo_index.insert(key, &encode_lat_lng(origin, insert_depth)); + geo_index.insert(key, &encode_lat_lng(origin, insert_depth), None); let res = geo_index .search(origin, range, 100, false, DEFAULT_DEPTH) @@ -226,8 +266,8 @@ mod test { let range = 10.0; let origin: LatLngCoord = [0.0, 0.0]; - geo_index.insert(&"a", &encode_lat_lng(origin, depth)); - geo_index.insert(&"b", &encode_lat_lng(origin, depth)); + geo_index.insert(&"a", &encode_lat_lng(origin, depth), None); + geo_index.insert(&"b", &encode_lat_lng(origin, depth), None); let res = geo_index .search(origin, range, 100, false, DEFAULT_DEPTH) @@ -252,16 +292,19 @@ mod test { let sorted = false; let origin: LatLngCoord = [0.0, 0.0]; - geo_index.insert_many(vec![ - ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), - ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), - ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), - ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), - ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), - ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), - ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ]); + geo_index.insert_many( + vec![ + ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), + ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), + ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), + ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), + ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), + ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), + ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ], + None, + ); let res = geo_index .search(origin, range, count, sorted, DEFAULT_DEPTH) @@ -281,16 +324,19 @@ mod test { let sorted = false; let origin: LatLngCoord = [0.0, 0.0]; - geo_index.insert_many(vec![ - ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), - ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), - ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), - ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), - ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), - ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), - ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ]); + geo_index.insert_many( + vec![ + ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), + ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), + ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), + ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), + ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), + ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), + ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ], + None, + ); let res = geo_index .search(origin, range, count, sorted, DEFAULT_DEPTH) @@ -306,16 +352,19 @@ mod test { let range = 1000.0; let origin: LatLngCoord = [0.0, 0.0]; - geo_index.insert_many(vec![ - ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), - ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), - ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), - ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), - ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), - ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), - ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ]); + geo_index.insert_many( + vec![ + ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), + ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), + ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), + ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), + ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), + ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), + ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ], + None, + ); let count = 1; @@ -351,16 +400,19 @@ mod test { let sorted = true; let origin: LatLngCoord = [0.0, 0.0]; - geo_index.insert_many(vec![ - ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), - ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), - ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), - ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), - ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), - ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), - ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), - ]); + geo_index.insert_many( + vec![ + ("a".to_string(), encode_lat_lng([1.0, 0.0], depth)), + ("b".to_string(), encode_lat_lng([1.0, 1.0], depth)), + ("c".to_string(), encode_lat_lng([0.0, 1.0], depth)), + ("d".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ("e".to_string(), encode_lat_lng([-1., 0.0], depth)), + ("f".to_string(), encode_lat_lng([-1.0, -1.0], depth)), + ("g".to_string(), encode_lat_lng([0.0, -1.0], depth)), + ("h".to_string(), encode_lat_lng([0.0, 0.0], depth)), + ], + None, + ); let res = geo_index .search(origin, range, count, sorted, DEFAULT_DEPTH) @@ -383,7 +435,7 @@ mod test { for n in 0..capacity { let (lat, lng) = (rng.gen_range(-90f64..90f64), rng.gen_range(-180f64..180f64)); - geo_index.insert(&n.to_string(), &encode_lat_lng([lat, lng], depth)); + geo_index.insert(&n.to_string(), &encode_lat_lng([lat, lng], depth), None); } let center = [0f64, 0f64]; @@ -397,4 +449,28 @@ mod test { assert!(neighbor.distance <= range); }); } + + #[test] + fn can_purge() { + let mut geo_index = SpatialIndex::default(); + let depth: usize = 6; + let range = 1000.0; + let count = 2; + let sorted = false; + let origin: LatLngCoord = [0.0, 0.0]; + + let duration = Duration::from_millis(500); + + geo_index.insert(&"a", &encode_lat_lng(origin, depth), Some(duration)); + geo_index.insert(&"b", &encode_lat_lng(origin, depth), Some(duration)); + + std::thread::sleep(duration); + + geo_index.purge(); + + let res = geo_index + .search(origin, range, count, sorted, DEFAULT_DEPTH) + .unwrap(); + assert_eq!(res.len(), 0); + } } diff --git a/geoprox-core/src/models.rs b/geoprox-core/src/models.rs index a21c097..41554b4 100644 --- a/geoprox-core/src/models.rs +++ b/geoprox-core/src/models.rs @@ -9,7 +9,7 @@ pub type LatLngCoord = [f64; 2]; /// Nearby object #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] -#[derive(Debug, Serialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Neighbor { /// Distance in kilometers pub distance: f64, diff --git a/geoprox-core/src/shard.rs b/geoprox-core/src/shard.rs index 494aa0b..db55ef8 100644 --- a/geoprox-core/src/shard.rs +++ b/geoprox-core/src/shard.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use geohash::GeohashError; use hashbrown::{HashMap, HashSet}; use itertools::Itertools; @@ -7,7 +9,7 @@ use crate::cache::SpatialIndex; use crate::models::{BatchOutput, GeoShardConfig, GeoShardError, LatLngCoord, Neighbor}; /// A collection of geospatial indexes stored in-memory -#[derive(Default, Clone)] +#[derive(Clone, Default)] pub struct GeoShard { cache: HashMap, config: GeoShardConfig, @@ -39,6 +41,13 @@ impl GeoShard { } } + /// Removes expired keys for all indices + pub fn purge_keys(&mut self) { + self.cache + .par_iter_mut() + .for_each(|(_, index)| index.purge()) + } + /// Deletes index from the shard pub fn drop_index(&mut self, index: &str) { self.cache.remove(index); @@ -50,6 +59,7 @@ impl GeoShard { index: &str, key: &str, [lat, lng]: LatLngCoord, + ttl: Option, ) -> Result { if let Some(geo_index) = self.cache.get_mut(index) { match geohash::encode( @@ -57,7 +67,7 @@ impl GeoShard { self.config.insert_depth.unwrap_or(Self::DEFAULT_DEPTH), ) { Ok(ghash) => { - geo_index.insert(key, &ghash); + geo_index.insert(key, &ghash, ttl); Ok(ghash) } Err(err) => Err(GeoShardError::GeohashError(err)), @@ -72,6 +82,7 @@ impl GeoShard { &mut self, index: &str, objects: Vec<(String, LatLngCoord)>, + ttl: Option, preserve_order: bool, ) -> Result, GeoShardError> { if let Some(geo_index) = self.cache.get_mut(index) { @@ -94,7 +105,7 @@ impl GeoShard { }) }; - geo_index.insert_many(bulk.clone()); + geo_index.insert_many(bulk.clone(), ttl); Ok((bulk, errors)) } else { @@ -217,6 +228,7 @@ mod test { ("b".to_string(), [1.0, 0.5]), ("c".to_string(), [0.0, 0.0]), ], + None, false, ) .unwrap(); @@ -245,6 +257,7 @@ mod test { ("e".to_string(), [-1.0, -0.5]), ("f".to_string(), [0.0, 0.0]), ], + None, false, ) .unwrap(); @@ -288,6 +301,7 @@ mod test { ("e".to_string(), [-1.0, -0.5]), ("f".to_string(), [0.0, 0.0]), ], + None, false, ) .unwrap(); diff --git a/geoprox-server/src/api.rs b/geoprox-server/src/api.rs index 08761b8..5387209 100644 --- a/geoprox-server/src/api.rs +++ b/geoprox-server/src/api.rs @@ -6,11 +6,23 @@ use axum::{ Router, }; use std::sync::Arc; +use std::time::Duration; /// Returns REST API router pub fn routes(app_state: AppState) -> Router { let state = SharedState::from(app_state); + let sidecar_state = Arc::clone(&state); + tokio::spawn(async move { + let mut timer = tokio::time::interval(Duration::from_secs(1)); + loop { + timer.tick().await; + if let Ok(mut app) = sidecar_state.try_write() { + app.geoshard.purge_keys(); + } + } + }); + Router::new() .nest( "/geohash", @@ -157,7 +169,7 @@ mod test { .json(&json!({ "key": "Alice", "lat": 1.0, "lng": 0.0 })) .await; server - .put("/api/v1/geoshard/drivers") + .put("/api/v1/shard/drivers") .json(&json!({ "key": "Bob", "lat": 0.0, "lng": 1.0 })) .await; let res = server @@ -169,4 +181,33 @@ mod test { res.assert_status_ok(); assert_eq!(res.header("content-type"), "application/json"); } + + #[tokio::test] + async fn can_invalidate_keys() { + let server = setup(); + server.post("/api/v1/shard/drivers").await; + server + .put("/api/v1/shard/drivers") + .json(&json!({ "key": "Alice", "lat": 0.0, "lng": 0.0, "ttl": 1 })) + .await; + server + .put("/api/v1/shard/drivers") + .json(&json!({ "key": "Bob", "lat": 0.0, "lng": 0.0, "ttl": 1 })) + .await; + let res = server + .get("/api/v1/shard/drivers") + .add_query_params(json!({ + "lat": 0.0, "lng": 0.0, "range": 1000 + })) + .await; + assert_eq!(res.json::().found.len(), 2); + tokio::time::sleep(Duration::from_secs(2)).await; + let res = server + .get("/api/v1/shard/drivers") + .add_query_params(json!({ + "lat": 0.0, "lng": 0.0, "range": 1000 + })) + .await; + assert_eq!(res.json::().found.len(), 0); + } } diff --git a/geoprox-server/src/dto.rs b/geoprox-server/src/dto.rs index f0cfd85..9080ae8 100644 --- a/geoprox-server/src/dto.rs +++ b/geoprox-server/src/dto.rs @@ -89,6 +89,8 @@ pub struct InsertKey { pub lat: f64, /// Longitude pub lng: f64, + /// The time-to-live (TTL) for this key, in seconds + pub ttl: Option, } /// Returns key and geohash @@ -105,6 +107,8 @@ pub struct InsertKeyResponse { pub struct InsertKeyBatch { /// Object key pub keys: Vec, + /// The time-to-live (TTL) for these keys, in seconds + pub ttl: Option, // Insert keys in the order they were received. pub preserve_order: bool, } @@ -182,7 +186,7 @@ pub struct QueryRange { } /// Returns object keys found with their distance -#[derive(Serialize, ToSchema, ToResponse)] +#[derive(Serialize, Deserialize, ToSchema, ToResponse)] pub struct QueryRangeResponse { /// Object keys found within range pub found: Vec, diff --git a/geoprox-server/src/handlers.rs b/geoprox-server/src/handlers.rs index 1acf2c9..63cc94f 100644 --- a/geoprox-server/src/handlers.rs +++ b/geoprox-server/src/handlers.rs @@ -92,6 +92,8 @@ pub mod geohash_api { } pub mod geoshard_api { + use std::time::Duration; + use crate::app::{AppError, SharedState}; use crate::dto::{ CreateIndexResponse, DropIndexResponse, InsertKey, InsertKeyBatch, InsertKeyBatchResponse, @@ -194,10 +196,11 @@ pub mod geoshard_api { extract::Json(payload): extract::Json, ) -> Result, AppError> { let mut state = state.write().unwrap(); + let ttl: Option = payload.ttl.map(Duration::from_secs); match state .geoshard - .insert_key(&index, &payload.key, [payload.lat, payload.lng]) + .insert_key(&index, &payload.key, [payload.lat, payload.lng], ttl) { Ok(geohash) => Ok(Json(InsertKeyResponse { key: payload.key, @@ -230,11 +233,13 @@ pub mod geoshard_api { extract::Json(payload): extract::Json, ) -> Result, AppError> { let mut state = state.write().unwrap(); + let ttl: Option = payload.ttl.map(Duration::from_secs); + let preserve_order = payload.preserve_order; match state .geoshard - .insert_many_keys(&index, payload.into(), preserve_order) + .insert_many_keys(&index, payload.into(), ttl, preserve_order) { Ok((res, errs)) => Ok(Json(InsertKeyBatchResponse { results: res.into_iter().collect(),