From b05688060511fa2ed43a380ed841a58aeaf7d35b Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 17 Nov 2024 20:13:30 +0530 Subject: [PATCH 01/10] init Signed-off-by: Pushkar Mishra --- Cargo.toml | 2 + fs-cache/Cargo.toml | 30 +++ fs-cache/src/cache.rs | 69 ++++++ fs-cache/src/lib.rs | 2 + fs-cache/src/memory_limited_storage.rs | 299 +++++++++++++++++++++++++ 5 files changed, 402 insertions(+) create mode 100644 fs-cache/Cargo.toml create mode 100644 fs-cache/src/cache.rs create mode 100644 fs-cache/src/lib.rs create mode 100644 fs-cache/src/memory_limited_storage.rs diff --git a/Cargo.toml b/Cargo.toml index eda17fbc..63a2235e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "data-pdf", "data-resource", "fs-atomic-versions", + "fs-cache", "fs-atomic-light", "fs-metadata", "fs-properties", @@ -23,6 +24,7 @@ default-members = [ "data-pdf", "data-resource", "fs-atomic-versions", + "fs-cache", "fs-atomic-light", "fs-metadata", "fs-properties", diff --git a/fs-cache/Cargo.toml b/fs-cache/Cargo.toml new file mode 100644 index 00000000..07e12fd1 --- /dev/null +++ b/fs-cache/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "fs-cache" +version = "0.1.0" +edition = "2021" + +[lib] +name = "fs_cache" +crate-type = ["rlib", "cdylib"] +bench = false + +[dependencies] +log = { version = "0.4.17", features = ["release_max_level_off"] } +serde_json = "1.0.82" +serde = { version = "1.0.138", features = ["derive"] } +jni = { version = "0.21.1", optional = true } +jnix = { version = "0.5.1", features = ["derive"], optional = true } +data-error = { path = "../data-error" } +data-resource = { path = "../data-resource" } +fs-storage = { path = "../fs-storage"} +linked-hash-map = "0.5.6" + +[dev-dependencies] +anyhow = "1.0.81" +quickcheck = { version = "1.0.3", features = ["use_logging"] } +quickcheck_macros = "1.0.0" +tempdir = "0.3.7" + +[features] +default = ["jni-bindings"] +jni-bindings = ["jni", "jnix"] diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs new file mode 100644 index 00000000..5adff9c2 --- /dev/null +++ b/fs-cache/src/cache.rs @@ -0,0 +1,69 @@ +use data_error::Result; +use fs_storage::{base_storage::SyncStatus, monoid::Monoid}; +use std::path::Path; + +use crate::memory_limited_storage::MemoryLimitedStorage; + +/// A generic cache implementation that stores values with LRU eviction in memory +/// and persistence to disk. +pub struct Cache { + storage: MemoryLimitedStorage, +} + +impl Cache +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::fmt::Display + + std::hash::Hash + + std::str::FromStr, + V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, +{ + /// Create a new cache with given capacity + /// - `label`: Used for logging and error messages + /// - `path`: Directory where cache files will be stored + /// - `max_memory_items`: Maximum number of items to keep in memory + pub fn new( + label: String, + path: &Path, + max_memory_items: usize, + ) -> Result { + let storage = MemoryLimitedStorage::new(label, path, max_memory_items)?; + + Ok(Self { storage }) + } + + /// Get a value from the cache if it exists + /// Returns None if not found + pub fn get(&mut self, key: &K) -> Result> { + self.storage.get(key) + } + + /// Store a value in the cache + /// Will persist to disk and maybe keep in memory based on LRU policy + pub fn set(&mut self, key: K, value: V) -> Result<()> { + self.storage.set(key, value) + } + + /// Load most recent cached items into memory based on timestamps + pub fn load_recent(&mut self) -> Result<()> { + self.storage.load_fs() + } + + /// Get number of items currently in memory + // pub fn memory_items(&self) -> usize { + // self.storage.memory_items() + // } + + /// Get sync status between memory and disk + pub fn sync_status(&self) -> Result { + self.storage.sync_status() + } + + /// Sync changes to disk + pub fn sync(&mut self) -> Result<()> { + self.storage.sync() + } +} diff --git a/fs-cache/src/lib.rs b/fs-cache/src/lib.rs new file mode 100644 index 00000000..9a5f392b --- /dev/null +++ b/fs-cache/src/lib.rs @@ -0,0 +1,2 @@ +pub mod memory_limited_storage; +pub mod cache; diff --git a/fs-cache/src/memory_limited_storage.rs b/fs-cache/src/memory_limited_storage.rs new file mode 100644 index 00000000..c81415c6 --- /dev/null +++ b/fs-cache/src/memory_limited_storage.rs @@ -0,0 +1,299 @@ +use std::collections::BTreeSet; +use std::fs::{self, File}; +use std::io::Write; +use std::time::SystemTime; +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; + +use data_error::{ArklibError, Result}; +use fs_storage::base_storage::SyncStatus; +use fs_storage::monoid::Monoid; +use linked_hash_map::LinkedHashMap; + +pub struct MemoryLimitedStorage { + /// Label for logging + label: String, + /// Path to the underlying folder where data is persisted + path: PathBuf, + /// In-memory LRU cache combining map and queue functionality + memory_cache: LinkedHashMap, + /// Maximum number of items to keep in memory + max_memory_items: usize, + /// Track disk timestamps only + disk_timestamps: BTreeMap, + /// Temporary store for deleted keys until storage is synced + deleted_keys: BTreeSet, +} + +impl MemoryLimitedStorage +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::fmt::Display + + std::hash::Hash + + std::str::FromStr, + V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, +{ + pub fn new( + label: String, + path: &Path, + max_memory_items: usize, + ) -> Result { + let storage = Self { + label, + path: PathBuf::from(path), + memory_cache: LinkedHashMap::with_capacity(max_memory_items), + max_memory_items, + disk_timestamps: BTreeMap::new(), + deleted_keys: BTreeSet::new(), + }; + + // TODO: add load_fs; + + // Create directory if it doesn't exist + fs::create_dir_all(&storage.path)?; + + Ok(storage) + } + + pub fn load_fs(&mut self) -> Result<()> { + if !self.path.exists() { + return Ok(()); + } + + // Collect all files with their timestamps + let mut entries = Vec::new(); + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path + .extension() + .map_or(false, |ext| ext == "json") + { + if let Ok(metadata) = fs::metadata(&path) { + if let Ok(modified) = metadata.modified() { + let key: K = + extract_key_from_file_path(&self.label, &path)?; + entries.push((key, modified, path)); + } + } + } + } + + // Sort by timestamp, newest first + entries.sort_by(|a, b| b.1.cmp(&a.1)); + + // Clear current cache and timestamps + self.memory_cache.clear(); + self.disk_timestamps.clear(); + + // Load only up to max_memory_items, newest first + for (key, timestamp, path) in entries.iter().take(self.max_memory_items) + { + match File::open(path) { + Ok(file) => { + if let Ok(value) = serde_json::from_reader(file) { + self.memory_cache.insert(key.clone(), value); + } + } + Err(err) => { + log::warn!("Failed to read file for key {}: {}", key, err); + continue; + } + } + self.disk_timestamps + .insert(key.clone(), *timestamp); + } + + // Add remaining timestamps to disk_timestamps without loading values + for (key, timestamp, _) in entries.iter().skip(self.max_memory_items) { + self.disk_timestamps + .insert(key.clone(), *timestamp); + } + + log::info!( + "{} loaded {} items in memory, {} total on disk", + self.label, + self.memory_cache.len(), + self.disk_timestamps.len() + ); + + Ok(()) + } + + // Write a single value to disk + fn write_value_to_disk(&mut self, key: &K, value: &V) -> Result<()> { + let file_path = self.path.join(format!("{}.json", key)); + let mut file = File::create(&file_path)?; + file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; + file.flush()?; + + let new_timestamp = SystemTime::now(); + file.set_modified(new_timestamp)?; + file.sync_all()?; + + self.disk_timestamps + .insert(key.clone(), new_timestamp); + + Ok(()) + } + + // Load a single value from disk + fn load_value_from_disk(&self, key: &K) -> Result { + let file_path = self.path.join(format!("{}.json", key)); + let file = File::open(&file_path)?; + let value: V = serde_json::from_reader(file).map_err(|err| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to read value for key {}: {}", key, err), + ) + })?; + Ok(value) + } + + pub fn get(&mut self, key: &K) -> Result> { + // Check memory cache first - will update LRU order automatically + if let Some(value) = self.memory_cache.get_refresh(key) { + return Ok(Some(value.clone())); + } + + // Try to load from disk + let file_path = self.path.join(format!("{}.json", key)); + if file_path.exists() && !self.deleted_keys.contains(key) { + let value = self.load_value_from_disk(key)?; + self.add_to_memory_cache(key.clone(), value.clone()); + return Ok(Some(value)); + } + + Ok(None) + } + + fn add_to_memory_cache(&mut self, key: K, value: V) { + // If at capacity, LinkedHashMap will remove oldest entry automatically + if self.memory_cache.len() >= self.max_memory_items { + self.memory_cache.pop_front(); // Removes least recently used + } + self.memory_cache.insert(key, value); + } + + pub fn set(&mut self, key: K, value: V) -> Result<()> { + // Always write to disk first + self.write_value_to_disk(&key, &value)?; + + // Then update memory cache + self.add_to_memory_cache(key.clone(), value); + self.deleted_keys.remove(&key); + + Ok(()) + } + + pub fn sync(&mut self) -> Result<()> { + // Since we write through on set(), we only need to handle removals + for key in self.deleted_keys.iter() { + let file_path = self.path.join(format!("{}.json", key)); + if file_path.exists() { + fs::remove_file(&file_path)?; + self.disk_timestamps.remove(key); + } + } + + // Also add latest externally added cache in memory, by comparing timestamps + // for entry in fs::read_dir(&self.path)? { + // let entry = entry?; + // let path = entry.path(); + // if path.is_file() && path.extension().map_or(false, |ext| ext == "json") { + // let key = extract_key_from_file_path(&self.label, &path)?; + + // // Only handle completely new keys + // if !self.memory_cache.contains_key(&key) + // && !self.disk_timestamps.contains_key(&key) + // && !self.deleted_keys.contains(&key) { + + // if let Ok(metadata) = fs::metadata(&path) { + // if let Ok(modified) = metadata.modified() { + // // New key found - load it + // if let Ok(value) = self.load_value_from_disk(&key) { + // self.disk_timestamps.insert(key.clone(), modified); + + // // Only add to memory if we have space or it's newer than oldest + // if self.memory_cache.len() < self.max_memory_items { + // self.add_to_memory_cache(key, value); + // } + // // Could add logic here to compare with oldest memory item + // } + // } + // } + // } + // } + // } + + self.deleted_keys.clear(); + Ok(()) + } + + pub fn sync_status(&self) -> Result { + // Since we write-through on set(), the only thing that can make storage stale + // is pending deletions + let ram_newer = !self.deleted_keys.is_empty(); + + // Check for new files on disk that we don't know about + let mut disk_newer = false; + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path + .extension() + .map_or(false, |ext| ext == "json") + { + let key = extract_key_from_file_path(&self.label, &path)?; + if !self.memory_cache.contains_key(&key) + && !self.disk_timestamps.contains_key(&key) + && !self.deleted_keys.contains(&key) + { + disk_newer = true; + break; + } + } + } + + Ok(match (ram_newer, disk_newer) { + (false, false) => SyncStatus::InSync, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (true, true) => SyncStatus::Diverge, + }) + } +} + +fn extract_key_from_file_path(label: &str, path: &Path) -> Result +where + K: std::str::FromStr, +{ + path.file_stem() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to extract file stem from filename".to_owned(), + ) + })? + .to_str() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to convert file stem to string".to_owned(), + ) + })? + .parse::() + .map_err(|_| { + ArklibError::Storage( + label.to_owned(), + "Failed to parse key from filename".to_owned(), + ) + }) +} From 8797e8f671f802742b69424011f624c82007ae28 Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 17 Nov 2024 20:14:05 +0530 Subject: [PATCH 02/10] fix Signed-off-by: Pushkar Mishra --- fs-cache/src/lib.rs | 2 +- fs-cache/src/memory_limited_storage.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/fs-cache/src/lib.rs b/fs-cache/src/lib.rs index 9a5f392b..4f916de7 100644 --- a/fs-cache/src/lib.rs +++ b/fs-cache/src/lib.rs @@ -1,2 +1,2 @@ -pub mod memory_limited_storage; pub mod cache; +pub mod memory_limited_storage; diff --git a/fs-cache/src/memory_limited_storage.rs b/fs-cache/src/memory_limited_storage.rs index c81415c6..c0c6d313 100644 --- a/fs-cache/src/memory_limited_storage.rs +++ b/fs-cache/src/memory_limited_storage.rs @@ -208,18 +208,18 @@ where // let path = entry.path(); // if path.is_file() && path.extension().map_or(false, |ext| ext == "json") { // let key = extract_key_from_file_path(&self.label, &path)?; - + // // Only handle completely new keys - // if !self.memory_cache.contains_key(&key) - // && !self.disk_timestamps.contains_key(&key) + // if !self.memory_cache.contains_key(&key) + // && !self.disk_timestamps.contains_key(&key) // && !self.deleted_keys.contains(&key) { - + // if let Ok(metadata) = fs::metadata(&path) { // if let Ok(modified) = metadata.modified() { // // New key found - load it // if let Ok(value) = self.load_value_from_disk(&key) { // self.disk_timestamps.insert(key.clone(), modified); - + // // Only add to memory if we have space or it's newer than oldest // if self.memory_cache.len() < self.max_memory_items { // self.add_to_memory_cache(key, value); @@ -231,7 +231,7 @@ where // } // } // } - + self.deleted_keys.clear(); Ok(()) } From 56597acd0173fc7afad34f26f381fa422ada67c5 Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Mon, 18 Nov 2024 22:04:35 +0530 Subject: [PATCH 03/10] few changes Signed-off-by: Pushkar Mishra --- ark-cli/src/util.rs | 7 +- fs-cache/Cargo.toml | 6 -- fs-cache/src/memory_limited_storage.rs | 93 ++++++++++++++++---------- 3 files changed, 60 insertions(+), 46 deletions(-) diff --git a/ark-cli/src/util.rs b/ark-cli/src/util.rs index 40eff290..c5db3536 100644 --- a/ark-cli/src/util.rs +++ b/ark-cli/src/util.rs @@ -179,10 +179,9 @@ pub fn translate_storage( root: &Option, storage: &str, ) -> Option<(PathBuf, Option)> { - if let Ok(path) = PathBuf::from_str(storage) { - if path.exists() && path.is_dir() { - return Some((path, None)); - } + let Ok(path) = PathBuf::from_str(storage); + if path.exists() && path.is_dir() { + return Some((path, None)); } match storage.to_lowercase().as_str() { diff --git a/fs-cache/Cargo.toml b/fs-cache/Cargo.toml index 07e12fd1..acd38473 100644 --- a/fs-cache/Cargo.toml +++ b/fs-cache/Cargo.toml @@ -12,8 +12,6 @@ bench = false log = { version = "0.4.17", features = ["release_max_level_off"] } serde_json = "1.0.82" serde = { version = "1.0.138", features = ["derive"] } -jni = { version = "0.21.1", optional = true } -jnix = { version = "0.5.1", features = ["derive"], optional = true } data-error = { path = "../data-error" } data-resource = { path = "../data-resource" } fs-storage = { path = "../fs-storage"} @@ -24,7 +22,3 @@ anyhow = "1.0.81" quickcheck = { version = "1.0.3", features = ["use_logging"] } quickcheck_macros = "1.0.0" tempdir = "0.3.7" - -[features] -default = ["jni-bindings"] -jni-bindings = ["jni", "jnix"] diff --git a/fs-cache/src/memory_limited_storage.rs b/fs-cache/src/memory_limited_storage.rs index c0c6d313..945d1fba 100644 --- a/fs-cache/src/memory_limited_storage.rs +++ b/fs-cache/src/memory_limited_storage.rs @@ -43,7 +43,7 @@ where path: &Path, max_memory_items: usize, ) -> Result { - let storage = Self { + let mut storage = Self { label, path: PathBuf::from(path), memory_cache: LinkedHashMap::with_capacity(max_memory_items), @@ -52,17 +52,24 @@ where deleted_keys: BTreeSet::new(), }; - // TODO: add load_fs; - - // Create directory if it doesn't exist - fs::create_dir_all(&storage.path)?; + storage.load_fs()?; Ok(storage) } pub fn load_fs(&mut self) -> Result<()> { if !self.path.exists() { - return Ok(()); + return Err(ArklibError::Storage( + self.label.clone(), + "Folder does not exist".to_owned(), + )); + } + + if !self.path.is_dir() { + return Err(ArklibError::Storage( + self.label.clone(), + "Path is not a directory".to_owned(), + )); } // Collect all files with their timestamps @@ -110,6 +117,7 @@ where .insert(key.clone(), *timestamp); } + // TODO: WHY?: Later used in sync-status to detect is it recently externally modified or not // Add remaining timestamps to disk_timestamps without loading values for (key, timestamp, _) in entries.iter().skip(self.max_memory_items) { self.disk_timestamps @@ -193,7 +201,7 @@ where } pub fn sync(&mut self) -> Result<()> { - // Since we write through on set(), we only need to handle removals + // Handle removals for key in self.deleted_keys.iter() { let file_path = self.path.join(format!("{}.json", key)); if file_path.exists() { @@ -202,35 +210,48 @@ where } } - // Also add latest externally added cache in memory, by comparing timestamps - // for entry in fs::read_dir(&self.path)? { - // let entry = entry?; - // let path = entry.path(); - // if path.is_file() && path.extension().map_or(false, |ext| ext == "json") { - // let key = extract_key_from_file_path(&self.label, &path)?; - - // // Only handle completely new keys - // if !self.memory_cache.contains_key(&key) - // && !self.disk_timestamps.contains_key(&key) - // && !self.deleted_keys.contains(&key) { - - // if let Ok(metadata) = fs::metadata(&path) { - // if let Ok(modified) = metadata.modified() { - // // New key found - load it - // if let Ok(value) = self.load_value_from_disk(&key) { - // self.disk_timestamps.insert(key.clone(), modified); - - // // Only add to memory if we have space or it's newer than oldest - // if self.memory_cache.len() < self.max_memory_items { - // self.add_to_memory_cache(key, value); - // } - // // Could add logic here to compare with oldest memory item - // } - // } - // } - // } - // } - // } + // Collect and sort all entries by timestamp + let mut entries = Vec::new(); + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path + .extension() + .map_or(false, |ext| ext == "json") + { + if let Ok(metadata) = fs::metadata(&path) { + if let Ok(modified) = metadata.modified() { + let key = + extract_key_from_file_path(&self.label, &path)?; + if !self.deleted_keys.contains(&key) { + entries.push((key, modified, path)); + } + } + } + } + } + + // Sort by timestamp, newest first + entries.sort_by(|a, b| b.1.cmp(&a.1)); + + // Clear current cache and rebuild with most recent entries + self.memory_cache.clear(); + + // Take most recent entries up to max_memory_items + for (key, timestamp, _) in entries.iter().take(self.max_memory_items) { + if let Ok(value) = self.load_value_from_disk(key) { + self.memory_cache.insert(key.clone(), value); + self.disk_timestamps + .insert(key.clone(), *timestamp); + } + } + + // Track remaining timestamps without loading values + for (key, timestamp, _) in entries.iter().skip(self.max_memory_items) { + self.disk_timestamps + .insert(key.clone(), *timestamp); + } self.deleted_keys.clear(); Ok(()) From e5b41bc3a7f984b56d906a071ea45be4c001c09a Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Wed, 20 Nov 2024 22:10:16 +0530 Subject: [PATCH 04/10] simplified cache & removed `remove` Signed-off-by: Pushkar Mishra --- fs-cache/src/cache.rs | 59 +++++--------- fs-cache/src/memory_limited_storage.rs | 106 ++----------------------- 2 files changed, 26 insertions(+), 139 deletions(-) diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index 5adff9c2..8fa19620 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -1,11 +1,7 @@ +use crate::memory_limited_storage::MemoryLimitedStorage; use data_error::Result; -use fs_storage::{base_storage::SyncStatus, monoid::Monoid}; use std::path::Path; -use crate::memory_limited_storage::MemoryLimitedStorage; - -/// A generic cache implementation that stores values with LRU eviction in memory -/// and persistence to disk. pub struct Cache { storage: MemoryLimitedStorage, } @@ -19,51 +15,36 @@ where + std::fmt::Display + std::hash::Hash + std::str::FromStr, - V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, + V: Clone + serde::Serialize + serde::de::DeserializeOwned, { - /// Create a new cache with given capacity - /// - `label`: Used for logging and error messages - /// - `path`: Directory where cache files will be stored - /// - `max_memory_items`: Maximum number of items to keep in memory pub fn new( label: String, path: &Path, - max_memory_items: usize, + max_memory_bytes: usize, ) -> Result { - let storage = MemoryLimitedStorage::new(label, path, max_memory_items)?; - - Ok(Self { storage }) + log::debug!( + "{} cache initialized with {} bytes limit", + label, + max_memory_bytes + ); + Ok(Self { + storage: MemoryLimitedStorage::new(label, path, max_memory_bytes)?, + }) } - /// Get a value from the cache if it exists - /// Returns None if not found pub fn get(&mut self, key: &K) -> Result> { - self.storage.get(key) + let result = self.storage.get(key)?; + log::debug!( + "{} cache: get key={} -> found={}", + self.storage.label(), + key, + result.is_some() + ); + Ok(result) } - /// Store a value in the cache - /// Will persist to disk and maybe keep in memory based on LRU policy pub fn set(&mut self, key: K, value: V) -> Result<()> { + log::debug!("{} cache: set key={}", self.storage.label(), key); self.storage.set(key, value) } - - /// Load most recent cached items into memory based on timestamps - pub fn load_recent(&mut self) -> Result<()> { - self.storage.load_fs() - } - - /// Get number of items currently in memory - // pub fn memory_items(&self) -> usize { - // self.storage.memory_items() - // } - - /// Get sync status between memory and disk - pub fn sync_status(&self) -> Result { - self.storage.sync_status() - } - - /// Sync changes to disk - pub fn sync(&mut self) -> Result<()> { - self.storage.sync() - } } diff --git a/fs-cache/src/memory_limited_storage.rs b/fs-cache/src/memory_limited_storage.rs index 945d1fba..7e86a37b 100644 --- a/fs-cache/src/memory_limited_storage.rs +++ b/fs-cache/src/memory_limited_storage.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeSet; use std::fs::{self, File}; use std::io::Write; use std::time::SystemTime; @@ -8,8 +7,6 @@ use std::{ }; use data_error::{ArklibError, Result}; -use fs_storage::base_storage::SyncStatus; -use fs_storage::monoid::Monoid; use linked_hash_map::LinkedHashMap; pub struct MemoryLimitedStorage { @@ -23,8 +20,6 @@ pub struct MemoryLimitedStorage { max_memory_items: usize, /// Track disk timestamps only disk_timestamps: BTreeMap, - /// Temporary store for deleted keys until storage is synced - deleted_keys: BTreeSet, } impl MemoryLimitedStorage @@ -36,7 +31,7 @@ where + std::fmt::Display + std::hash::Hash + std::str::FromStr, - V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, + V: Clone + serde::Serialize + serde::de::DeserializeOwned, { pub fn new( label: String, @@ -49,7 +44,6 @@ where memory_cache: LinkedHashMap::with_capacity(max_memory_items), max_memory_items, disk_timestamps: BTreeMap::new(), - deleted_keys: BTreeSet::new(), }; storage.load_fs()?; @@ -57,6 +51,10 @@ where Ok(storage) } + pub fn label(&self) -> String { + self.label.clone() + } + pub fn load_fs(&mut self) -> Result<()> { if !self.path.exists() { return Err(ArklibError::Storage( @@ -172,7 +170,7 @@ where // Try to load from disk let file_path = self.path.join(format!("{}.json", key)); - if file_path.exists() && !self.deleted_keys.contains(key) { + if file_path.exists() { let value = self.load_value_from_disk(key)?; self.add_to_memory_cache(key.clone(), value.clone()); return Ok(Some(value)); @@ -195,101 +193,9 @@ where // Then update memory cache self.add_to_memory_cache(key.clone(), value); - self.deleted_keys.remove(&key); - - Ok(()) - } - - pub fn sync(&mut self) -> Result<()> { - // Handle removals - for key in self.deleted_keys.iter() { - let file_path = self.path.join(format!("{}.json", key)); - if file_path.exists() { - fs::remove_file(&file_path)?; - self.disk_timestamps.remove(key); - } - } - - // Collect and sort all entries by timestamp - let mut entries = Vec::new(); - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() - && path - .extension() - .map_or(false, |ext| ext == "json") - { - if let Ok(metadata) = fs::metadata(&path) { - if let Ok(modified) = metadata.modified() { - let key = - extract_key_from_file_path(&self.label, &path)?; - if !self.deleted_keys.contains(&key) { - entries.push((key, modified, path)); - } - } - } - } - } - - // Sort by timestamp, newest first - entries.sort_by(|a, b| b.1.cmp(&a.1)); - - // Clear current cache and rebuild with most recent entries - self.memory_cache.clear(); - - // Take most recent entries up to max_memory_items - for (key, timestamp, _) in entries.iter().take(self.max_memory_items) { - if let Ok(value) = self.load_value_from_disk(key) { - self.memory_cache.insert(key.clone(), value); - self.disk_timestamps - .insert(key.clone(), *timestamp); - } - } - - // Track remaining timestamps without loading values - for (key, timestamp, _) in entries.iter().skip(self.max_memory_items) { - self.disk_timestamps - .insert(key.clone(), *timestamp); - } - self.deleted_keys.clear(); Ok(()) } - - pub fn sync_status(&self) -> Result { - // Since we write-through on set(), the only thing that can make storage stale - // is pending deletions - let ram_newer = !self.deleted_keys.is_empty(); - - // Check for new files on disk that we don't know about - let mut disk_newer = false; - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() - && path - .extension() - .map_or(false, |ext| ext == "json") - { - let key = extract_key_from_file_path(&self.label, &path)?; - if !self.memory_cache.contains_key(&key) - && !self.disk_timestamps.contains_key(&key) - && !self.deleted_keys.contains(&key) - { - disk_newer = true; - break; - } - } - } - - Ok(match (ram_newer, disk_newer) { - (false, false) => SyncStatus::InSync, - (true, false) => SyncStatus::StorageStale, - (false, true) => SyncStatus::MappingStale, - (true, true) => SyncStatus::Diverge, - }) - } } fn extract_key_from_file_path(label: &str, path: &Path) -> Result From 33091bf215fcade8c4c9d9c992bd9f344212f35d Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Thu, 21 Nov 2024 12:29:51 +0530 Subject: [PATCH 05/10] fix Signed-off-by: Pushkar Mishra --- fs-cache/src/cache.rs | 16 +- fs-cache/src/memory_limited_storage.rs | 199 +++++++++++++++---------- 2 files changed, 136 insertions(+), 79 deletions(-) diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index 8fa19620..ddab5a62 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -32,18 +32,28 @@ where }) } - pub fn get(&mut self, key: &K) -> Result> { - let result = self.storage.get(key)?; + pub fn get(&mut self, key: &K) -> Option { + let result = self.storage.get(key); log::debug!( "{} cache: get key={} -> found={}", self.storage.label(), key, result.is_some() ); - Ok(result) + result } pub fn set(&mut self, key: K, value: V) -> Result<()> { + // Check if value already exists + if self.storage.get(&key).is_some() { + log::debug!( + "{} cache: skip setting existing key={}", + self.storage.label(), + key + ); + return Ok(()); + } + log::debug!("{} cache: set key={}", self.storage.label(), key); self.storage.set(key, value) } diff --git a/fs-cache/src/memory_limited_storage.rs b/fs-cache/src/memory_limited_storage.rs index 7e86a37b..68c0af01 100644 --- a/fs-cache/src/memory_limited_storage.rs +++ b/fs-cache/src/memory_limited_storage.rs @@ -1,10 +1,7 @@ use std::fs::{self, File}; use std::io::Write; +use std::path::{Path, PathBuf}; use std::time::SystemTime; -use std::{ - collections::BTreeMap, - path::{Path, PathBuf}, -}; use data_error::{ArklibError, Result}; use linked_hash_map::LinkedHashMap; @@ -16,10 +13,10 @@ pub struct MemoryLimitedStorage { path: PathBuf, /// In-memory LRU cache combining map and queue functionality memory_cache: LinkedHashMap, - /// Maximum number of items to keep in memory - max_memory_items: usize, - /// Track disk timestamps only - disk_timestamps: BTreeMap, + // Bytes present in memory + current_memory_bytes: usize, + /// Maximum bytes to keep in memory + max_memory_bytes: usize, } impl MemoryLimitedStorage @@ -36,14 +33,14 @@ where pub fn new( label: String, path: &Path, - max_memory_items: usize, + max_memory_bytes: usize, ) -> Result { let mut storage = Self { label, path: PathBuf::from(path), - memory_cache: LinkedHashMap::with_capacity(max_memory_items), - max_memory_items, - disk_timestamps: BTreeMap::new(), + memory_cache: LinkedHashMap::new(), + current_memory_bytes: 0, + max_memory_bytes, }; storage.load_fs()?; @@ -55,6 +52,46 @@ where self.label.clone() } + pub fn get(&mut self, key: &K) -> Option { + // Check memory cache first - will update LRU order automatically + if let Some(value) = self.memory_cache.get_refresh(key) { + return Some(value.clone()); + } + + // Try to load from disk + let file_path = self.path.join(format!("{}.json", key)); + if file_path.exists() { + // Doubt: Update file's modiied time (in disk) on read to preserve LRU across app restarts? + match self.load_value_from_disk(key) { + Ok(value) => { + self.add_to_memory_cache(key.clone(), value.clone()); + Some(value) + } + Err(err) => { + log::error!( + "{} cache: failed to load key={}: {}", + self.label, + key, + err + ); + None + } + } + } else { + None + } + } + + pub fn set(&mut self, key: K, value: V) -> Result<()> { + // Always write to disk first + self.write_value_to_disk(&key, &value)?; + + // Then update memory cache + self.add_to_memory_cache(key, value); + + Ok(()) + } + pub fn load_fs(&mut self) -> Result<()> { if !self.path.exists() { return Err(ArklibError::Storage( @@ -70,8 +107,8 @@ where )); } - // Collect all files with their timestamps - let mut entries = Vec::new(); + // First pass: collect metadata only + let mut file_metadata = Vec::new(); for entry in fs::read_dir(&self.path)? { let entry = entry?; let path = entry.path(); @@ -81,57 +118,67 @@ where .map_or(false, |ext| ext == "json") { if let Ok(metadata) = fs::metadata(&path) { - if let Ok(modified) = metadata.modified() { - let key: K = - extract_key_from_file_path(&self.label, &path)?; - entries.push((key, modified, path)); - } + let key = extract_key_from_file_path(&self.label, &path)?; + file_metadata.push((key, metadata.len() as usize)); } } } - // Sort by timestamp, newest first - entries.sort_by(|a, b| b.1.cmp(&a.1)); + // Sort by timestamp (newest first) before loading any values + file_metadata.sort_by(|a, b| b.1.cmp(&a.1)); - // Clear current cache and timestamps + // Clear existing cache self.memory_cache.clear(); - self.disk_timestamps.clear(); + self.current_memory_bytes = 0; - // Load only up to max_memory_items, newest first - for (key, timestamp, path) in entries.iter().take(self.max_memory_items) - { - match File::open(path) { - Ok(file) => { - if let Ok(value) = serde_json::from_reader(file) { - self.memory_cache.insert(key.clone(), value); + // TODO: Need some work here + // Second pass: load only the values that will fit in memory + let mut loaded_bytes = 0; + let mut total_bytes = 0; + + for (key, approx_size) in file_metadata { + total_bytes += approx_size; + + // Only load value if it will likely fit in memory + if loaded_bytes + approx_size <= self.max_memory_bytes { + match self.load_value_from_disk(&key) { + Ok(value) => { + let actual_size = Self::estimate_size(&value); + if loaded_bytes + actual_size <= self.max_memory_bytes { + self.memory_cache.insert(key, value); + loaded_bytes += actual_size; + } + } + Err(err) => { + log::warn!( + "{} cache: failed to load key={}: {}", + self.label, + key, + err + ); } - } - Err(err) => { - log::warn!("Failed to read file for key {}: {}", key, err); - continue; } } - self.disk_timestamps - .insert(key.clone(), *timestamp); } - // TODO: WHY?: Later used in sync-status to detect is it recently externally modified or not - // Add remaining timestamps to disk_timestamps without loading values - for (key, timestamp, _) in entries.iter().skip(self.max_memory_items) { - self.disk_timestamps - .insert(key.clone(), *timestamp); - } + self.current_memory_bytes = loaded_bytes; - log::info!( - "{} loaded {} items in memory, {} total on disk", + log::debug!( + "{} loaded {}/{} bytes in memory", self.label, - self.memory_cache.len(), - self.disk_timestamps.len() + self.current_memory_bytes, + total_bytes ); Ok(()) } + fn estimate_size(value: &V) -> usize { + serde_json::to_vec(value) + .map(|v| v.len()) + .unwrap_or(0) + } + // Write a single value to disk fn write_value_to_disk(&mut self, key: &K, value: &V) -> Result<()> { let file_path = self.path.join(format!("{}.json", key)); @@ -143,9 +190,6 @@ where file.set_modified(new_timestamp)?; file.sync_all()?; - self.disk_timestamps - .insert(key.clone(), new_timestamp); - Ok(()) } @@ -162,39 +206,42 @@ where Ok(value) } - pub fn get(&mut self, key: &K) -> Result> { - // Check memory cache first - will update LRU order automatically - if let Some(value) = self.memory_cache.get_refresh(key) { - return Ok(Some(value.clone())); - } + fn add_to_memory_cache(&mut self, key: K, value: V) { + let value_size = Self::estimate_size(&value); - // Try to load from disk - let file_path = self.path.join(format!("{}.json", key)); - if file_path.exists() { - let value = self.load_value_from_disk(key)?; - self.add_to_memory_cache(key.clone(), value.clone()); - return Ok(Some(value)); + // If single value is larger than total limit, just skip memory caching + if value_size > self.max_memory_bytes { + log::debug!( + "{} cache: value size {} exceeds limit {}", + self.label, + value_size, + self.max_memory_bytes + ); + return; } - Ok(None) - } - - fn add_to_memory_cache(&mut self, key: K, value: V) { - // If at capacity, LinkedHashMap will remove oldest entry automatically - if self.memory_cache.len() >= self.max_memory_items { - self.memory_cache.pop_front(); // Removes least recently used + // Remove oldest entries until we have space for new value + while self.current_memory_bytes + value_size > self.max_memory_bytes + && !self.memory_cache.is_empty() + { + if let Some((_, old_value)) = self.memory_cache.pop_front() { + self.current_memory_bytes = self + .current_memory_bytes + .saturating_sub(Self::estimate_size(&old_value)); + } } - self.memory_cache.insert(key, value); - } - - pub fn set(&mut self, key: K, value: V) -> Result<()> { - // Always write to disk first - self.write_value_to_disk(&key, &value)?; - // Then update memory cache - self.add_to_memory_cache(key.clone(), value); + // Add new value and update size + self.memory_cache.insert(key, value); + self.current_memory_bytes += value_size; - Ok(()) + log::debug!( + "{} cache: added {} bytes, total {}/{}", + self.label, + value_size, + self.current_memory_bytes, + self.max_memory_bytes + ); } } From 60225f4d6abbd4eb6291878147cb3af783b58e9a Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 1 Dec 2024 01:25:17 +0530 Subject: [PATCH 06/10] major changes Signed-off-by: Pushkar Mishra --- README.md | 25 +- fs-atomic-versions/src/atomic/file.rs | 2 +- fs-cache/Cargo.toml | 9 +- fs-cache/src/cache.rs | 493 +++++++++++++++++++++++-- fs-cache/src/lib.rs | 1 - fs-cache/src/memory_limited_storage.rs | 273 -------------- fs-storage/src/file_storage.rs | 11 +- fs-storage/src/folder_storage.rs | 41 +- fs-storage/src/lib.rs | 2 +- fs-storage/src/utils.rs | 56 ++- 10 files changed, 541 insertions(+), 372 deletions(-) delete mode 100644 fs-cache/src/memory_limited_storage.rs diff --git a/README.md b/README.md index 8a472b50..d9ab90e8 100644 --- a/README.md +++ b/README.md @@ -28,18 +28,19 @@ The core crate is `fs-index` which provides us with [content addressing](https:/
-| Package | Description | -| --------------- | ---------------------------------------- | -| `ark-cli` | The CLI tool to interact with ark crates | -| `data-resource` | Resource hashing and ID construction | -| `fs-index` | Resource Index construction and updating | -| `fs-storage` | Filesystem storage for resources | -| `fs-metadata` | Metadata management | -| `fs-properties` | Properties management | -| `data-link` | Linking resources | -| `data-pdf` | PDF handling | -| `data-error` | Error handling | -| `data-json` | JSON serialization and deserialization | +| Package | Description | +| --------------- | ---------------------------------------- | +| `ark-cli` | The CLI tool to interact with ark crates | +| `data-resource` | Resource hashing and ID construction | +| `fs-cache` | Memory and disk caching with LRU eviction | +| `fs-index` | Resource Index construction and updating | +| `fs-storage` | Filesystem storage for resources | +| `fs-metadata` | Metadata management | +| `fs-properties` | Properties management | +| `data-link` | Linking resources | +| `data-pdf` | PDF handling | +| `data-error` | Error handling | +| `data-json` | JSON serialization and deserialization |
diff --git a/fs-atomic-versions/src/atomic/file.rs b/fs-atomic-versions/src/atomic/file.rs index a2b708ab..4947b77e 100644 --- a/fs-atomic-versions/src/atomic/file.rs +++ b/fs-atomic-versions/src/atomic/file.rs @@ -126,7 +126,7 @@ impl AtomicFile { /// Return the latest version together with vector of the /// files matching this version. Multiple files for the same version - /// can appear due to usage of file syncronization. Different devices + /// can appear due to usage of file synchronization. Different devices /// can create same version simultaneously. pub fn latest_version(&self) -> Result<(usize, Vec)> { let files_iterator = fs::read_dir(&self.directory)?.flatten(); diff --git a/fs-cache/Cargo.toml b/fs-cache/Cargo.toml index acd38473..0242f359 100644 --- a/fs-cache/Cargo.toml +++ b/fs-cache/Cargo.toml @@ -10,15 +10,10 @@ bench = false [dependencies] log = { version = "0.4.17", features = ["release_max_level_off"] } -serde_json = "1.0.82" -serde = { version = "1.0.138", features = ["derive"] } data-error = { path = "../data-error" } -data-resource = { path = "../data-resource" } fs-storage = { path = "../fs-storage"} -linked-hash-map = "0.5.6" +lru = "0.12.5" [dev-dependencies] -anyhow = "1.0.81" -quickcheck = { version = "1.0.3", features = ["use_logging"] } -quickcheck_macros = "1.0.0" tempdir = "0.3.7" +rstest = "0.23" diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index ddab5a62..86b24428 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -1,60 +1,489 @@ -use crate::memory_limited_storage::MemoryLimitedStorage; -use data_error::Result; -use std::path::Path; +use std::fs; +use std::num::NonZeroUsize; +use std::path::{Path, PathBuf}; +use data_error::{ArklibError, Result}; +use fs_storage::utils::extract_key_from_file_path; +use lru::LruCache; + +/// A cache entry that stores a value and its size in bytes. +/// +/// This structure is used to track both the actual data (value) +/// and its memory usage (size) in the cache. +struct CacheEntry { + value: V, + size: usize, +} + +/// A combined in-memory and disk-based cache system. +/// +/// This cache uses an LRU (Least Recently Used) eviction policy for the +/// in-memory portion and persists data to disk for long-term storage. pub struct Cache { - storage: MemoryLimitedStorage, + /// Label for logging + label: String, + /// Path to the underlying folder where data is persisted + path: PathBuf, + /// An in-memory LRU cache for quick access to frequently used items. + memory_cache: LruCache>, + /// The current memory usage in bytes. + current_memory_bytes: usize, + /// The maximum allowable memory usage in bytes. + max_memory_bytes: usize, } impl Cache where - K: Ord - + Clone - + serde::Serialize - + serde::de::DeserializeOwned - + std::fmt::Display - + std::hash::Hash - + std::str::FromStr, - V: Clone + serde::Serialize + serde::de::DeserializeOwned, + K: Ord + Clone + std::fmt::Display + std::hash::Hash + std::str::FromStr, + V: Clone + std::fmt::Debug + AsRef<[u8]> + From>, { + /// Creates a new cache with the given label, storage path, and memory limit. pub fn new( label: String, path: &Path, max_memory_bytes: usize, ) -> Result { + let mut cache = Self { + label: label.clone(), + path: PathBuf::from(path), + // TODO: NEED FIX + memory_cache: LruCache::new( + NonZeroUsize::new(max_memory_bytes) + .expect("Capacity can't be zero"), + ), + current_memory_bytes: 0, + max_memory_bytes, + }; + log::debug!( - "{} cache initialized with {} bytes limit", + "cache/{}: initialized with {} bytes limit", label, max_memory_bytes ); - Ok(Self { - storage: MemoryLimitedStorage::new(label, path, max_memory_bytes)?, - }) + + cache.load_fs()?; + Ok(cache) } + /// Retrieves a value by key from memory cache or disk, returns None if not found. pub fn get(&mut self, key: &K) -> Option { - let result = self.storage.get(key); - log::debug!( - "{} cache: get key={} -> found={}", - self.storage.label(), - key, - result.is_some() - ); - result + log::debug!("cache/{}: retrieving value for key {}", self.label, key); + + if let Some(value) = self.get_from_memory(key) { + return Some(value); + } + + self.get_from_disk(key) } + /// Stores a value with the given key, writing to disk and updating memory cache. pub fn set(&mut self, key: K, value: V) -> Result<()> { + log::debug!("cache/{}: setting value for key {}", self.label, key); // Check if value already exists - if self.storage.get(&key).is_some() { - log::debug!( - "{} cache: skip setting existing key={}", - self.storage.label(), - key - ); + if self.get(&key).is_some() { + log::debug!("cache/{}: skipping existing key {}", self.label, key); return Ok(()); } - log::debug!("{} cache: set key={}", self.storage.label(), key); - self.storage.set(key, value) + // Always write to disk first + self.write_to_disk(&key, &value)?; + + // Then update memory cache + self.cache_in_memory(&key, &value)?; + + log::debug!("cache/{}: set key={}", self.label, key); + Ok(()) + } + + fn load_fs(&mut self) -> Result<()> { + if !self.path.exists() { + return Err(ArklibError::Storage( + self.label.clone(), + "Folder does not exist".to_owned(), + )); + } + + if !self.path.is_dir() { + return Err(ArklibError::Storage( + self.label.clone(), + "Path is not a directory".to_owned(), + )); + } + + // First pass: collect metadata only + let mut file_metadata = Vec::new(); + for entry in fs::read_dir(&self.path)? { + let path = entry?.path(); + if path.is_file() { + let key: K = + extract_key_from_file_path(&self.label, &path, true)?; + file_metadata.push((key.clone(), self.get_file_size(&key)?)); + } + } + + // Sort by size before loading + file_metadata.sort_by(|a, b| b.1.cmp(&a.1)); + + // Clear existing cache + self.memory_cache.clear(); + self.current_memory_bytes = 0; + + // Load files that fit in memory + let mut loaded_bytes = 0; + let total_bytes: usize = + file_metadata.iter().map(|(_, size)| size).sum(); + + for (key, approx_size) in file_metadata { + if loaded_bytes + approx_size <= self.max_memory_bytes { + match self.load_value_from_disk(&key) { + Ok(value) => { + // let actual_size = Self::estimate_size(&value); + let actual_size = self.get_file_size(&key)?; + if loaded_bytes + actual_size <= self.max_memory_bytes { + self.memory_cache.put( + key, + CacheEntry { + value, + size: actual_size, + }, + ); + loaded_bytes += actual_size; + } + } + Err(err) => { + log::warn!( + "cache/{}: failed to load key={}: {}", + self.label, + key, + err + ); + } + } + } + } + + self.current_memory_bytes = loaded_bytes; + + log::debug!( + "cache/{}: loaded {}/{} bytes in memory", + self.label, + self.current_memory_bytes, + total_bytes + ); + + Ok(()) + } + + fn get_from_memory(&mut self, key: &K) -> Option { + self.memory_cache + .get(key) + .map(|entry| entry.value.clone()) + } + + fn get_from_disk(&mut self, key: &K) -> Option { + let file_path = self.path.join(key.to_string()); + if !file_path.exists() { + log::warn!("cache/{}: no value found for key {}", self.label, key); + return None; + } + + match self.load_value_from_disk(key) { + Ok(value) => { + if let Err(err) = self.cache_in_memory(key, &value) { + log::error!( + "cache/{}: failed to add to memory cache for key {}: {}", + self.label, + key, + err + ); + return None; + } + Some(value) + } + Err(err) => { + log::error!( + "cache/{}: failed to load from disk for key {}: {}", + self.label, + key, + err + ); + None + } + } + } + + fn write_to_disk(&mut self, key: &K, value: &V) -> Result<()> { + log::debug!("cache/{}: writing to disk for key {}", self.label, key); + fs::create_dir_all(&self.path)?; + + let file_path = self.path.join(key.to_string()); + debug_assert!( + !file_path.exists(), + "File {} should not exist before writing", + file_path.display() + ); + + fs::write(&file_path, value.as_ref()).map_err(|err| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to write value for key {}: {}", key, err), + ) + }) + } + + fn load_value_from_disk(&self, key: &K) -> Result + where + V: From>, // Add trait bound for reading binary data + { + let file_path = self.path.join(key.to_string()); + let contents = fs::read(&file_path)?; + Ok(V::from(contents)) + } + + fn get_file_size(&self, key: &K) -> Result { + Ok(fs::metadata(self.path.join(key.to_string()))?.len() as usize) + } + + fn cache_in_memory(&mut self, key: &K, value: &V) -> Result<()> { + log::debug!("cache/{}: caching in memory for key {}", self.label, key); + let size = self.get_file_size(key)?; + + // If single value is larger than total limit, just skip memory caching + if size > self.max_memory_bytes { + return Err(ArklibError::Storage( + self.label.clone(), + format!( + "value size {} exceeds limit {}", + size, self.max_memory_bytes + ), + )); + } + + // Remove oldest entries until we have space for new value + while self.current_memory_bytes + size > self.max_memory_bytes { + let (_, old_entry) = self + .memory_cache + .pop_lru() + .expect("Cache should have entries to evict"); + debug_assert!( + self.current_memory_bytes >= old_entry.size, + "Memory tracking inconsistency detected" + ); + self.current_memory_bytes = self + .current_memory_bytes + .saturating_sub(old_entry.size); + } + + // Add new value and update size + self.memory_cache.put( + key.clone(), + CacheEntry { + value: value.clone(), + size, + }, + ); + self.current_memory_bytes += size; + + log::debug!( + "cache/{}: added {} bytes, total {}/{}", + self.label, + size, + self.current_memory_bytes, + self.max_memory_bytes + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::{fixture, rstest}; + use std::fs; + use tempdir::TempDir; + + // Helper struct that implements required traits + #[derive(Clone, Debug, PartialEq)] + struct TestValue(Vec); + + impl AsRef<[u8]> for TestValue { + fn as_ref(&self) -> &[u8] { + &self.0 + } + } + + impl From> for TestValue { + fn from(bytes: Vec) -> Self { + TestValue(bytes) + } + } + + #[fixture] + fn temp_dir() -> TempDir { + TempDir::new("tmp").expect("Failed to create temporary directory") + } + + #[fixture] + fn cache(temp_dir: TempDir) -> Cache { + Cache::new( + "test".to_string(), + temp_dir.path(), + 1024 * 1024, // 1MB + ) + .expect("Failed to create cache") + } + + #[rstest] + fn test_new_cache(cache: Cache) { + assert_eq!(cache.current_memory_bytes, 0); + assert_eq!(cache.max_memory_bytes, 1024 * 1024); + } + + #[rstest] + fn test_set_and_get(mut cache: Cache) { + let key = "test_key".to_string(); + let value = TestValue(vec![1, 2, 3, 4]); + + // Test set + cache + .set(key.clone(), value.clone()) + .expect("Failed to set value"); + + // Test get + let retrieved = cache.get(&key).expect("Failed to get value"); + assert_eq!(retrieved.0, value.0); + } + + #[rstest] + fn test_get_nonexistent(mut cache: Cache) { + assert!(cache.get(&"nonexistent".to_string()).is_none()); + } + + #[rstest] + fn test_memory_eviction(temp_dir: TempDir) { + // Create cache with small memory limit + let mut cache = Cache::new( + "test".to_string(), + temp_dir.path(), + 5, // Very small limit to force eviction + ) + .expect("Failed to create cache"); + + // Add first value + let key1 = "key1".to_string(); + let value1 = TestValue(vec![1, 2, 3, 4]); + cache + .set(key1.clone(), value1.clone()) + .expect("Failed to set value1"); + + // Add second value to trigger eviction + let key2 = "key2".to_string(); + let value2 = TestValue(vec![5, 6, 7, 8]); + cache + .set(key2.clone(), value2.clone()) + .expect("Failed to set value2"); + + // First value should be evicted from memory but still on disk + assert!(cache.memory_cache.get(&key1).is_none()); + assert_eq!(cache.get(&key1).unwrap(), value1); // Should load from disk + } + + #[rstest] + fn test_large_value_handling(mut cache: Cache) { + // let (mut cache, _dir) = setup_temp_cache(); + let key = "large_key".to_string(); + let large_value = TestValue(vec![0; 2 * 1024 * 1024]); // 2MB, larger than cache + + // Should fail to cache in memory but succeed in writing to disk + assert!(cache + .set(key.clone(), large_value.clone()) + .is_err()); + } + + #[rstest] + fn test_persistence(temp_dir: TempDir) { + let key = "persist_key".to_string(); + let value = TestValue(vec![1, 2, 3, 4]); + + // Scope for first cache instance + { + let mut cache = + Cache::new("test".to_string(), temp_dir.path(), 1024) + .expect("Failed to create first cache"); + cache + .set(key.clone(), value.clone()) + .expect("Failed to set value"); + } + + // Create new cache instance pointing to same directory + let mut cache2 = Cache::new("test".to_string(), temp_dir.path(), 1024) + .expect("Failed to create second cache"); + + // Should be able to read value written by first instance + let retrieved: TestValue = + cache2.get(&key).expect("Failed to get value"); + assert_eq!(retrieved.0, value.0); + } + + #[rstest] + fn test_duplicate_set(mut cache: Cache) { + let key = "dup_key".to_string(); + let value1 = TestValue(vec![1, 2, 3, 4]); + let value2 = TestValue(vec![5, 6, 7, 8]); + + // First set + cache + .set(key.clone(), value1.clone()) + .expect("Failed to set first value"); + + // Second set with same key should be skipped + cache + .set(key.clone(), value2) + .expect("Failed to set second value"); + + // Should still have first value + let retrieved = cache.get(&key).expect("Failed to get value"); + assert_eq!(retrieved.0, value1.0); + } + + #[rstest] + fn test_load_fs(temp_dir: TempDir) { + let path = temp_dir.path(); + + // Manually create some files + fs::write(path.join("key1"), vec![1, 2, 3]) + .expect("Failed to write file 1"); + fs::write(path.join("key2"), vec![4, 5, 6]) + .expect("Failed to write file 2"); + + // Create new cache instance to load existing files + let mut cache2: Cache = + Cache::new("test".to_string(), path, 1024) + .expect("Failed to create cache"); + + // Check if files were loaded + assert!(cache2.get(&"key1".to_string()).is_some()); + assert!(cache2.get(&"key2".to_string()).is_some()); + } + + #[rstest] + #[should_panic(expected = "Capacity can't be zero")] + fn test_zero_capacity(temp_dir: TempDir) { + let _cache: std::result::Result, ArklibError> = + Cache::new("test".to_string(), temp_dir.path(), 0); + } + + #[rstest] + fn test_memory_tracking(mut cache: Cache) { + let key = "track_key".to_string(); + let value = TestValue(vec![1, 2, 3, 4]); // 4 bytes + + cache + .set(key.clone(), value) + .expect("Failed to set value"); + + // Memory usage should match file size + assert_eq!(cache.current_memory_bytes, 4); } } diff --git a/fs-cache/src/lib.rs b/fs-cache/src/lib.rs index 4f916de7..a5c08fdb 100644 --- a/fs-cache/src/lib.rs +++ b/fs-cache/src/lib.rs @@ -1,2 +1 @@ pub mod cache; -pub mod memory_limited_storage; diff --git a/fs-cache/src/memory_limited_storage.rs b/fs-cache/src/memory_limited_storage.rs deleted file mode 100644 index 68c0af01..00000000 --- a/fs-cache/src/memory_limited_storage.rs +++ /dev/null @@ -1,273 +0,0 @@ -use std::fs::{self, File}; -use std::io::Write; -use std::path::{Path, PathBuf}; -use std::time::SystemTime; - -use data_error::{ArklibError, Result}; -use linked_hash_map::LinkedHashMap; - -pub struct MemoryLimitedStorage { - /// Label for logging - label: String, - /// Path to the underlying folder where data is persisted - path: PathBuf, - /// In-memory LRU cache combining map and queue functionality - memory_cache: LinkedHashMap, - // Bytes present in memory - current_memory_bytes: usize, - /// Maximum bytes to keep in memory - max_memory_bytes: usize, -} - -impl MemoryLimitedStorage -where - K: Ord - + Clone - + serde::Serialize - + serde::de::DeserializeOwned - + std::fmt::Display - + std::hash::Hash - + std::str::FromStr, - V: Clone + serde::Serialize + serde::de::DeserializeOwned, -{ - pub fn new( - label: String, - path: &Path, - max_memory_bytes: usize, - ) -> Result { - let mut storage = Self { - label, - path: PathBuf::from(path), - memory_cache: LinkedHashMap::new(), - current_memory_bytes: 0, - max_memory_bytes, - }; - - storage.load_fs()?; - - Ok(storage) - } - - pub fn label(&self) -> String { - self.label.clone() - } - - pub fn get(&mut self, key: &K) -> Option { - // Check memory cache first - will update LRU order automatically - if let Some(value) = self.memory_cache.get_refresh(key) { - return Some(value.clone()); - } - - // Try to load from disk - let file_path = self.path.join(format!("{}.json", key)); - if file_path.exists() { - // Doubt: Update file's modiied time (in disk) on read to preserve LRU across app restarts? - match self.load_value_from_disk(key) { - Ok(value) => { - self.add_to_memory_cache(key.clone(), value.clone()); - Some(value) - } - Err(err) => { - log::error!( - "{} cache: failed to load key={}: {}", - self.label, - key, - err - ); - None - } - } - } else { - None - } - } - - pub fn set(&mut self, key: K, value: V) -> Result<()> { - // Always write to disk first - self.write_value_to_disk(&key, &value)?; - - // Then update memory cache - self.add_to_memory_cache(key, value); - - Ok(()) - } - - pub fn load_fs(&mut self) -> Result<()> { - if !self.path.exists() { - return Err(ArklibError::Storage( - self.label.clone(), - "Folder does not exist".to_owned(), - )); - } - - if !self.path.is_dir() { - return Err(ArklibError::Storage( - self.label.clone(), - "Path is not a directory".to_owned(), - )); - } - - // First pass: collect metadata only - let mut file_metadata = Vec::new(); - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() - && path - .extension() - .map_or(false, |ext| ext == "json") - { - if let Ok(metadata) = fs::metadata(&path) { - let key = extract_key_from_file_path(&self.label, &path)?; - file_metadata.push((key, metadata.len() as usize)); - } - } - } - - // Sort by timestamp (newest first) before loading any values - file_metadata.sort_by(|a, b| b.1.cmp(&a.1)); - - // Clear existing cache - self.memory_cache.clear(); - self.current_memory_bytes = 0; - - // TODO: Need some work here - // Second pass: load only the values that will fit in memory - let mut loaded_bytes = 0; - let mut total_bytes = 0; - - for (key, approx_size) in file_metadata { - total_bytes += approx_size; - - // Only load value if it will likely fit in memory - if loaded_bytes + approx_size <= self.max_memory_bytes { - match self.load_value_from_disk(&key) { - Ok(value) => { - let actual_size = Self::estimate_size(&value); - if loaded_bytes + actual_size <= self.max_memory_bytes { - self.memory_cache.insert(key, value); - loaded_bytes += actual_size; - } - } - Err(err) => { - log::warn!( - "{} cache: failed to load key={}: {}", - self.label, - key, - err - ); - } - } - } - } - - self.current_memory_bytes = loaded_bytes; - - log::debug!( - "{} loaded {}/{} bytes in memory", - self.label, - self.current_memory_bytes, - total_bytes - ); - - Ok(()) - } - - fn estimate_size(value: &V) -> usize { - serde_json::to_vec(value) - .map(|v| v.len()) - .unwrap_or(0) - } - - // Write a single value to disk - fn write_value_to_disk(&mut self, key: &K, value: &V) -> Result<()> { - let file_path = self.path.join(format!("{}.json", key)); - let mut file = File::create(&file_path)?; - file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; - file.flush()?; - - let new_timestamp = SystemTime::now(); - file.set_modified(new_timestamp)?; - file.sync_all()?; - - Ok(()) - } - - // Load a single value from disk - fn load_value_from_disk(&self, key: &K) -> Result { - let file_path = self.path.join(format!("{}.json", key)); - let file = File::open(&file_path)?; - let value: V = serde_json::from_reader(file).map_err(|err| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to read value for key {}: {}", key, err), - ) - })?; - Ok(value) - } - - fn add_to_memory_cache(&mut self, key: K, value: V) { - let value_size = Self::estimate_size(&value); - - // If single value is larger than total limit, just skip memory caching - if value_size > self.max_memory_bytes { - log::debug!( - "{} cache: value size {} exceeds limit {}", - self.label, - value_size, - self.max_memory_bytes - ); - return; - } - - // Remove oldest entries until we have space for new value - while self.current_memory_bytes + value_size > self.max_memory_bytes - && !self.memory_cache.is_empty() - { - if let Some((_, old_value)) = self.memory_cache.pop_front() { - self.current_memory_bytes = self - .current_memory_bytes - .saturating_sub(Self::estimate_size(&old_value)); - } - } - - // Add new value and update size - self.memory_cache.insert(key, value); - self.current_memory_bytes += value_size; - - log::debug!( - "{} cache: added {} bytes, total {}/{}", - self.label, - value_size, - self.current_memory_bytes, - self.max_memory_bytes - ); - } -} - -fn extract_key_from_file_path(label: &str, path: &Path) -> Result -where - K: std::str::FromStr, -{ - path.file_stem() - .ok_or_else(|| { - ArklibError::Storage( - label.to_owned(), - "Failed to extract file stem from filename".to_owned(), - ) - })? - .to_str() - .ok_or_else(|| { - ArklibError::Storage( - label.to_owned(), - "Failed to convert file stem to string".to_owned(), - ) - })? - .parse::() - .map_err(|_| { - ArklibError::Storage( - label.to_owned(), - "Failed to parse key from filename".to_owned(), - ) - }) -} diff --git a/fs-storage/src/file_storage.rs b/fs-storage/src/file_storage.rs index 2e3332e9..f4676984 100644 --- a/fs-storage/src/file_storage.rs +++ b/fs-storage/src/file_storage.rs @@ -1,8 +1,7 @@ use serde::{Deserialize, Serialize}; use std::{ collections::BTreeMap, - fs::{self, File}, - io::Write, + fs, path::{Path, PathBuf}, time::SystemTime, }; @@ -10,7 +9,7 @@ use std::{ use crate::{ base_storage::{BaseStorage, SyncStatus}, monoid::Monoid, - utils::read_version_2_fs, + utils::{read_version_2_fs, write_json_file}, }; use data_error::{ArklibError, Result}; @@ -262,13 +261,9 @@ where ) })?; fs::create_dir_all(parent_dir)?; - let mut file = File::create(&self.path)?; - file.write_all(serde_json::to_string_pretty(&self.data)?.as_bytes())?; - file.flush()?; let new_timestamp = SystemTime::now(); - file.set_modified(new_timestamp)?; - file.sync_all()?; + write_json_file(&self.path, &self.data, new_timestamp)?; self.modified = new_timestamp; self.written_to_disk = new_timestamp; diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 68892a49..e1d147d2 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,6 +1,5 @@ use std::collections::BTreeSet; use std::fs::{self, File}; -use std::io::Write; use std::time::SystemTime; use std::{ collections::BTreeMap, @@ -9,6 +8,7 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; +use crate::utils::{extract_key_from_file_path, write_json_file}; use data_error::{ArklibError, Result}; /// Represents a folder storage system that persists data to disk. @@ -92,7 +92,8 @@ where .extension() .map_or(false, |ext| ext == "json") { - let key: K = extract_key_from_file_path(&self.label, &path)?; + let key: K = + extract_key_from_file_path(&self.label, &path, false)?; let file = File::open(&path)?; let value: V = serde_json::from_reader(file).map_err(|err| { @@ -280,7 +281,8 @@ where .extension() .map_or(false, |ext| ext == "json") { - let key = extract_key_from_file_path(&self.label, &path)?; + let key = + extract_key_from_file_path(&self.label, &path, false)?; if !self.data.contains_key(&key) && !self.deleted_keys.contains(&key) { @@ -348,13 +350,9 @@ where for (key, value) in &self.data { let file_path = self.path.join(format!("{}.json", key)); - let mut file = File::create(&file_path)?; - file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; - file.flush()?; let new_timestamp = SystemTime::now(); - file.set_modified(new_timestamp)?; - file.sync_all()?; + write_json_file(&file_path, &value, new_timestamp)?; self.timestamps .insert(key.clone(), (new_timestamp, new_timestamp)); @@ -414,33 +412,6 @@ where } } -fn extract_key_from_file_path(label: &str, path: &Path) -> Result -where - K: std::str::FromStr, -{ - path.file_stem() - .ok_or_else(|| { - ArklibError::Storage( - label.to_owned(), - "Failed to extract file stem from filename".to_owned(), - ) - })? - .to_str() - .ok_or_else(|| { - ArklibError::Storage( - label.to_owned(), - "Failed to convert file stem to string".to_owned(), - ) - })? - .parse::() - .map_err(|_| { - ArklibError::Storage( - label.to_owned(), - "Failed to parse key from filename".to_owned(), - ) - }) -} - #[cfg(test)] mod tests { use crate::{ diff --git a/fs-storage/src/lib.rs b/fs-storage/src/lib.rs index 83e3868c..28f53603 100644 --- a/fs-storage/src/lib.rs +++ b/fs-storage/src/lib.rs @@ -6,7 +6,7 @@ pub mod folder_storage; pub mod jni; pub mod monoid; -mod utils; +pub mod utils; pub const ARK_FOLDER: &str = ".ark"; // Should not be lost if possible diff --git a/fs-storage/src/utils.rs b/fs-storage/src/utils.rs index d1e818bd..f20d860d 100644 --- a/fs-storage/src/utils.rs +++ b/fs-storage/src/utils.rs @@ -1,5 +1,7 @@ -use data_error::Result; -use std::{collections::BTreeMap, path::Path}; +use data_error::{ArklibError, Result}; +use serde::Serialize; +use std::io::Write; +use std::{collections::BTreeMap, fs::File, path::Path, time::SystemTime}; /// Parses version 2 `FileStorage` format and returns the data as a BTreeMap /// @@ -51,6 +53,56 @@ where Ok(data) } +/// Writes a serializable value to a file and returns the timestamp of the write +pub fn write_json_file( + path: &Path, + value: &T, + time: SystemTime, +) -> Result<()> { + let mut file = File::create(path)?; + file.write_all(serde_json::to_string_pretty(value)?.as_bytes())?; + file.flush()?; + + file.set_modified(time)?; + file.sync_all()?; + + Ok(()) +} + +pub fn extract_key_from_file_path( + label: &str, + path: &Path, + include_extension: bool, +) -> Result +where + K: std::str::FromStr, +{ + match include_extension { + true => path.file_name(), // ("tmp/foo.txt").file_name() -> ("foo.txt") + false => path.file_stem(), // ("tmp/foo.txt").file_stem() -> ("foo") + } + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to extract file stem from filename".to_owned(), + ) + })? + .to_str() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to convert file stem to string".to_owned(), + ) + })? + .parse::() + .map_err(|_| { + ArklibError::Storage( + label.to_owned(), + "Failed to parse key from filename".to_owned(), + ) + }) +} + #[cfg(test)] mod tests { use super::*; From 39d9bb06ae0dda0a970d0de3b3dc25263c83d425 Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 1 Dec 2024 17:18:16 +0530 Subject: [PATCH 07/10] fix Signed-off-by: Pushkar Mishra --- fs-cache/Cargo.toml | 1 + fs-cache/src/cache.rs | 249 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 206 insertions(+), 44 deletions(-) diff --git a/fs-cache/Cargo.toml b/fs-cache/Cargo.toml index 0242f359..b70c582f 100644 --- a/fs-cache/Cargo.toml +++ b/fs-cache/Cargo.toml @@ -12,6 +12,7 @@ bench = false log = { version = "0.4.17", features = ["release_max_level_off"] } data-error = { path = "../data-error" } fs-storage = { path = "../fs-storage"} +fs-atomic-light = { path = "../fs-atomic-light" } lru = "0.12.5" [dev-dependencies] diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index 86b24428..27ffe74d 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -3,6 +3,7 @@ use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use data_error::{ArklibError, Result}; +use fs_atomic_light::temp_and_move; use fs_storage::utils::extract_key_from_file_path; use lru::LruCache; @@ -30,6 +31,8 @@ pub struct Cache { current_memory_bytes: usize, /// The maximum allowable memory usage in bytes. max_memory_bytes: usize, + /// Whether to include values in debug logs + log_values: bool, } impl Cache @@ -37,11 +40,18 @@ where K: Ord + Clone + std::fmt::Display + std::hash::Hash + std::str::FromStr, V: Clone + std::fmt::Debug + AsRef<[u8]> + From>, { - /// Creates a new cache with the given label, storage path, and memory limit. + /// Creates a new cache instance. + /// + /// # Arguments + /// * `label` - Identifier used in logs + /// * `path` - Directory where cache files are stored + /// * `max_memory_bytes` - Maximum bytes to keep in memory + /// * `log_values` - Whether to include values in debug logs pub fn new( label: String, path: &Path, max_memory_bytes: usize, + log_values: bool, ) -> Result { let mut cache = Self { label: label.clone(), @@ -53,6 +63,7 @@ where ), current_memory_bytes: 0, max_memory_bytes, + log_values, }; log::debug!( @@ -65,36 +76,85 @@ where Ok(cache) } - /// Retrieves a value by key from memory cache or disk, returns None if not found. + /// Retrieves a value by its key, checking memory first then disk. + /// Returns None if the key doesn't exist. pub fn get(&mut self, key: &K) -> Option { log::debug!("cache/{}: retrieving value for key {}", self.label, key); - if let Some(value) = self.get_from_memory(key) { - return Some(value); + let value = self + .fetch_from_memory(key) + .or_else(|| self.fetch_from_disk(key)); + + match &value { + Some(v) => { + if self.log_values { + log::debug!( + "cache/{}: found value for key {}: {:?}", + self.label, + key, + v + ); + } + } + None => { + log::warn!( + "cache/{}: no value found for key {}", + self.label, + key + ); + } } - self.get_from_disk(key) + value } - /// Stores a value with the given key, writing to disk and updating memory cache. + /// Stores a new value with the given key. + /// Returns error if the key already exists or if writing fails. pub fn set(&mut self, key: K, value: V) -> Result<()> { log::debug!("cache/{}: setting value for key {}", self.label, key); + // Check if value already exists - if self.get(&key).is_some() { - log::debug!("cache/{}: skipping existing key {}", self.label, key); - return Ok(()); + if self.exists(&key) { + return Err(ArklibError::Storage( + self.label.clone(), + format!("Key {} already exists in cache", key), + )); } // Always write to disk first - self.write_to_disk(&key, &value)?; + self.persist_to_disk(&key, &value)?; // Then update memory cache - self.cache_in_memory(&key, &value)?; + self.update_memory_cache(&key, &value)?; log::debug!("cache/{}: set key={}", self.label, key); Ok(()) } + /// Checks if a value exists either in memory or on disk. + pub fn exists(&self, key: &K) -> bool { + self.memory_cache.contains(key) + || self.path.join(key.to_string()).exists() + } + + /// Returns an ordered iterator over all cached keys. + pub fn keys(&self) -> Result> { + let keys: Vec = fs::read_dir(&self.path)? + .filter_map(|entry| entry.ok()) + .filter_map(|entry| { + let path = entry.path(); + if path.is_file() { + extract_key_from_file_path(&self.label, &path, false).ok() + } else { + None + } + }) + .collect(); + + Ok(keys.into_iter()) + } + + // Internal Methods: fn load_fs(&mut self) -> Result<()> { if !self.path.exists() { return Err(ArklibError::Storage( @@ -135,7 +195,7 @@ where for (key, approx_size) in file_metadata { if loaded_bytes + approx_size <= self.max_memory_bytes { - match self.load_value_from_disk(&key) { + match self.read_from_disk(&key) { Ok(value) => { // let actual_size = Self::estimate_size(&value); let actual_size = self.get_file_size(&key)?; @@ -174,22 +234,24 @@ where Ok(()) } - fn get_from_memory(&mut self, key: &K) -> Option { + // Retrieves a value from the memory cache. + fn fetch_from_memory(&mut self, key: &K) -> Option { self.memory_cache .get(key) .map(|entry| entry.value.clone()) } - fn get_from_disk(&mut self, key: &K) -> Option { + // Retrieves a value from disk and caches it in memory if possible. + fn fetch_from_disk(&mut self, key: &K) -> Option { let file_path = self.path.join(key.to_string()); if !file_path.exists() { log::warn!("cache/{}: no value found for key {}", self.label, key); return None; } - match self.load_value_from_disk(key) { + match self.read_from_disk(key) { Ok(value) => { - if let Err(err) = self.cache_in_memory(key, &value) { + if let Err(err) = self.update_memory_cache(key, &value) { log::error!( "cache/{}: failed to add to memory cache for key {}: {}", self.label, @@ -212,7 +274,8 @@ where } } - fn write_to_disk(&mut self, key: &K, value: &V) -> Result<()> { + // Writes a value to disk using atomic operations. + fn persist_to_disk(&mut self, key: &K, value: &V) -> Result<()> { log::debug!("cache/{}: writing to disk for key {}", self.label, key); fs::create_dir_all(&self.path)?; @@ -223,15 +286,17 @@ where file_path.display() ); - fs::write(&file_path, value.as_ref()).map_err(|err| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to write value for key {}: {}", key, err), - ) - }) + temp_and_move(value.as_ref(), self.path.clone(), &key.to_string()) + .map_err(|err| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to write value for key {}: {}", key, err), + ) + }) } - fn load_value_from_disk(&self, key: &K) -> Result + // Reads a value from disk. + fn read_from_disk(&self, key: &K) -> Result where V: From>, // Add trait bound for reading binary data { @@ -240,23 +305,41 @@ where Ok(V::from(contents)) } + // Returns the size of a value in bytes. + // + // First checks the memory cache for size information to avoid disk access. + // Falls back to checking the file size on disk if not found in memory. fn get_file_size(&self, key: &K) -> Result { - Ok(fs::metadata(self.path.join(key.to_string()))?.len() as usize) + if let Some(entry) = self.memory_cache.peek(key) { + return Ok(entry.size); + } + + let file_path = self.path.join(key.to_string()); + fs::metadata(&file_path) + .map(|m| m.len() as usize) + .map_err(|err| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to get size for key {}: {}", key, err), + ) + }) } - fn cache_in_memory(&mut self, key: &K, value: &V) -> Result<()> { + // Adds or updates a value in the memory cache, evicting old entries if needed. + // Returns error if value is larger than maximum memory limit. + fn update_memory_cache(&mut self, key: &K, value: &V) -> Result<()> { log::debug!("cache/{}: caching in memory for key {}", self.label, key); let size = self.get_file_size(key)?; // If single value is larger than total limit, just skip memory caching if size > self.max_memory_bytes { - return Err(ArklibError::Storage( - self.label.clone(), - format!( - "value size {} exceeds limit {}", - size, self.max_memory_bytes - ), - )); + log::error!( + "cache/{}: value size {} exceeds limit {}", + self.label, + size, + self.max_memory_bytes + ); + return Ok(()); } // Remove oldest entries until we have space for new value @@ -330,6 +413,7 @@ mod tests { "test".to_string(), temp_dir.path(), 1024 * 1024, // 1MB + false, ) .expect("Failed to create cache") } @@ -355,11 +439,51 @@ mod tests { assert_eq!(retrieved.0, value.0); } + #[rstest] + fn test_exists(mut cache: Cache) { + let key = "test_key".to_string(); + let value = TestValue(vec![1, 2, 3, 4]); + + assert!(!cache.exists(&key)); + cache + .set(key.clone(), value) + .expect("Failed to set value"); + assert!(cache.exists(&key)); + } + #[rstest] fn test_get_nonexistent(mut cache: Cache) { assert!(cache.get(&"nonexistent".to_string()).is_none()); } + #[rstest] + fn test_keys(mut cache: Cache) { + let values = vec![ + ("key1".to_string(), vec![1, 2]), + ("key2".to_string(), vec![3, 4]), + ("key3".to_string(), vec![5, 6]), + ]; + + // Add values + for (key, data) in values.iter() { + cache + .set(key.clone(), TestValue(data.clone())) + .expect("Failed to set value"); + } + + // Check keys + let mut cache_keys: Vec<_> = cache + .keys() + .expect("Failed to get keys") + .collect(); + cache_keys.sort(); + let mut expected_keys: Vec<_> = + values.iter().map(|(k, _)| k.clone()).collect(); + expected_keys.sort(); + + assert_eq!(cache_keys, expected_keys); + } + #[rstest] fn test_memory_eviction(temp_dir: TempDir) { // Create cache with small memory limit @@ -367,6 +491,7 @@ mod tests { "test".to_string(), temp_dir.path(), 5, // Very small limit to force eviction + false, ) .expect("Failed to create cache"); @@ -391,14 +516,14 @@ mod tests { #[rstest] fn test_large_value_handling(mut cache: Cache) { - // let (mut cache, _dir) = setup_temp_cache(); let key = "large_key".to_string(); let large_value = TestValue(vec![0; 2 * 1024 * 1024]); // 2MB, larger than cache // Should fail to cache in memory but succeed in writing to disk assert!(cache .set(key.clone(), large_value.clone()) - .is_err()); + .is_ok()); + assert_eq!(cache.get(&key).unwrap(), large_value); // Should load from disk } #[rstest] @@ -409,7 +534,7 @@ mod tests { // Scope for first cache instance { let mut cache = - Cache::new("test".to_string(), temp_dir.path(), 1024) + Cache::new("test".to_string(), temp_dir.path(), 1024, false) .expect("Failed to create first cache"); cache .set(key.clone(), value.clone()) @@ -417,8 +542,9 @@ mod tests { } // Create new cache instance pointing to same directory - let mut cache2 = Cache::new("test".to_string(), temp_dir.path(), 1024) - .expect("Failed to create second cache"); + let mut cache2 = + Cache::new("test".to_string(), temp_dir.path(), 1024, false) + .expect("Failed to create second cache"); // Should be able to read value written by first instance let retrieved: TestValue = @@ -426,6 +552,43 @@ mod tests { assert_eq!(retrieved.0, value.0); } + #[rstest] + fn test_concurrent_reads(temp_dir: TempDir) { + use std::thread; + + let key = "test_key".to_string(); + let value = TestValue(vec![1, 2, 3, 4]); + + // Set up initial cache with data + let mut cache = + Cache::new("test".to_string(), temp_dir.path(), 1024, false) + .expect("Failed to create cache"); + cache + .set(key.clone(), value.clone()) + .expect("Failed to set value"); + + // Create multiple reader caches + let mut handles: Vec>> = vec![]; + for _ in 0..3 { + let key = key.clone(); + let cache_path = temp_dir.path().to_path_buf(); + + handles.push(thread::spawn(move || { + let mut reader_cache = + Cache::new("test".to_string(), &cache_path, 1024, false) + .expect("Failed to create reader cache"); + + reader_cache.get(&key) + })); + } + + // All readers should get the same value + for handle in handles { + let result = handle.join().expect("Thread panicked"); + assert_eq!(result.unwrap(), value); + } + } + #[rstest] fn test_duplicate_set(mut cache: Cache) { let key = "dup_key".to_string(); @@ -437,10 +600,8 @@ mod tests { .set(key.clone(), value1.clone()) .expect("Failed to set first value"); - // Second set with same key should be skipped - cache - .set(key.clone(), value2) - .expect("Failed to set second value"); + // Second set with same key should panic + assert!(cache.set(key.clone(), value2).is_err()); // Should still have first value let retrieved = cache.get(&key).expect("Failed to get value"); @@ -459,7 +620,7 @@ mod tests { // Create new cache instance to load existing files let mut cache2: Cache = - Cache::new("test".to_string(), path, 1024) + Cache::new("test".to_string(), path, 1024, false) .expect("Failed to create cache"); // Check if files were loaded @@ -471,7 +632,7 @@ mod tests { #[should_panic(expected = "Capacity can't be zero")] fn test_zero_capacity(temp_dir: TempDir) { let _cache: std::result::Result, ArklibError> = - Cache::new("test".to_string(), temp_dir.path(), 0); + Cache::new("test".to_string(), temp_dir.path(), 0, false); } #[rstest] From a4b201ad2b76e931b7d346c5a2260d49c01100d8 Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 1 Dec 2024 17:53:11 +0530 Subject: [PATCH 08/10] fix Signed-off-by: Pushkar Mishra --- fs-cache/src/cache.rs | 120 +++++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index 27ffe74d..8abb9f39 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -37,7 +37,12 @@ pub struct Cache { impl Cache where - K: Ord + Clone + std::fmt::Display + std::hash::Hash + std::str::FromStr, + K: Ord + + Clone + + std::fmt::Display + + std::hash::Hash + + std::str::FromStr + + std::fmt::Debug, V: Clone + std::fmt::Debug + AsRef<[u8]> + From>, { /// Creates a new cache instance. @@ -144,7 +149,7 @@ where .filter_map(|entry| { let path = entry.path(); if path.is_file() { - extract_key_from_file_path(&self.label, &path, false).ok() + extract_key_from_file_path(&self.label, &path, true).ok() } else { None } @@ -170,19 +175,23 @@ where )); } - // First pass: collect metadata only + // Collect metadata for all files let mut file_metadata = Vec::new(); for entry in fs::read_dir(&self.path)? { - let path = entry?.path(); + let entry = entry?; + let path = entry.path(); if path.is_file() { let key: K = extract_key_from_file_path(&self.label, &path, true)?; - file_metadata.push((key.clone(), self.get_file_size(&key)?)); + let metadata = entry.metadata()?; + let modified = metadata.modified()?; + let size = metadata.len() as usize; + file_metadata.push((key, size, modified)); } } - // Sort by size before loading - file_metadata.sort_by(|a, b| b.1.cmp(&a.1)); + // Sort by modified time (most recent first) + file_metadata.sort_by(|a, b| b.2.cmp(&a.2)); // Clear existing cache self.memory_cache.clear(); @@ -190,25 +199,18 @@ where // Load files that fit in memory let mut loaded_bytes = 0; - let total_bytes: usize = - file_metadata.iter().map(|(_, size)| size).sum(); + let total_bytes: usize = file_metadata + .iter() + .map(|(_, size, _)| size) + .sum(); - for (key, approx_size) in file_metadata { - if loaded_bytes + approx_size <= self.max_memory_bytes { + for (key, size, _) in file_metadata { + if loaded_bytes + size <= self.max_memory_bytes { match self.read_from_disk(&key) { Ok(value) => { - // let actual_size = Self::estimate_size(&value); - let actual_size = self.get_file_size(&key)?; - if loaded_bytes + actual_size <= self.max_memory_bytes { - self.memory_cache.put( - key, - CacheEntry { - value, - size: actual_size, - }, - ); - loaded_bytes += actual_size; - } + self.memory_cache + .put(key, CacheEntry { value, size }); + loaded_bytes += size; } Err(err) => { log::warn!( @@ -310,19 +312,7 @@ where // First checks the memory cache for size information to avoid disk access. // Falls back to checking the file size on disk if not found in memory. fn get_file_size(&self, key: &K) -> Result { - if let Some(entry) = self.memory_cache.peek(key) { - return Ok(entry.size); - } - - let file_path = self.path.join(key.to_string()); - fs::metadata(&file_path) - .map(|m| m.len() as usize) - .map_err(|err| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to get size for key {}: {}", key, err), - ) - }) + Ok(fs::metadata(self.path.join(key.to_string()))?.len() as usize) } // Adds or updates a value in the memory cache, evicting old entries if needed. @@ -383,7 +373,11 @@ where mod tests { use super::*; use rstest::{fixture, rstest}; - use std::fs; + use std::{ + fs::File, + io::Write, + time::{Duration, SystemTime}, + }; use tempdir::TempDir; // Helper struct that implements required traits @@ -609,23 +603,43 @@ mod tests { } #[rstest] - fn test_load_fs(temp_dir: TempDir) { - let path = temp_dir.path(); - - // Manually create some files - fs::write(path.join("key1"), vec![1, 2, 3]) - .expect("Failed to write file 1"); - fs::write(path.join("key2"), vec![4, 5, 6]) - .expect("Failed to write file 2"); - - // Create new cache instance to load existing files - let mut cache2: Cache = - Cache::new("test".to_string(), path, 1024, false) - .expect("Failed to create cache"); + fn test_loads_recent_files_first(temp_dir: TempDir) { + let mut cache: Cache = Cache::new( + "test".to_string(), + temp_dir.path(), + 4, // Small limit to force selection + false, + ) + .expect("Failed to create cache"); + + // Create files with different timestamps + let files = vec![ + ( + "old.txt", + vec![1, 2, 3], + SystemTime::now() - Duration::from_secs(100), + ), + ("new.txt", vec![3, 4], SystemTime::now()), + ]; + + for (name, data, time) in files { + let path = temp_dir.path().join(name); + let mut file = File::create(path).unwrap(); + file.write_all(&data).unwrap(); + file.set_modified(time).unwrap(); + file.sync_all().unwrap(); + } - // Check if files were loaded - assert!(cache2.get(&"key1".to_string()).is_some()); - assert!(cache2.get(&"key2".to_string()).is_some()); + // Reload cache + cache.load_fs().expect("Failed to load files"); + + // Verify newer file is in memory + assert!(cache + .memory_cache + .contains(&"new.txt".to_string())); + assert!(!cache + .memory_cache + .contains(&"old.txt".to_string())); } #[rstest] From de954cdefc38ac77996b1855efa6aea93e20243a Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 1 Dec 2024 17:55:52 +0530 Subject: [PATCH 09/10] fix Signed-off-by: Pushkar Mishra --- fs-cache/src/cache.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index 8abb9f39..28d5ce83 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -480,25 +480,24 @@ mod tests { #[rstest] fn test_memory_eviction(temp_dir: TempDir) { - // Create cache with small memory limit let mut cache = Cache::new( "test".to_string(), temp_dir.path(), - 5, // Very small limit to force eviction + 8, // Very small limit to force eviction false, ) .expect("Failed to create cache"); // Add first value - let key1 = "key1".to_string(); - let value1 = TestValue(vec![1, 2, 3, 4]); + let key1 = "key1.txt".to_string(); + let value1 = TestValue(vec![1, 2, 3, 4, 5, 7]); cache .set(key1.clone(), value1.clone()) .expect("Failed to set value1"); // Add second value to trigger eviction - let key2 = "key2".to_string(); - let value2 = TestValue(vec![5, 6, 7, 8]); + let key2 = "key2.json".to_string(); + let value2 = TestValue(vec![5, 6, 8]); cache .set(key2.clone(), value2.clone()) .expect("Failed to set value2"); From 9136354a72ecb4e73c904cda120a93f97bd77c84 Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sun, 1 Dec 2024 18:05:27 +0530 Subject: [PATCH 10/10] improve comments Signed-off-by: Pushkar Mishra --- fs-cache/src/cache.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/fs-cache/src/cache.rs b/fs-cache/src/cache.rs index 28d5ce83..fb9881e8 100644 --- a/fs-cache/src/cache.rs +++ b/fs-cache/src/cache.rs @@ -159,7 +159,12 @@ where Ok(keys.into_iter()) } - // Internal Methods: + /// Internal Methods: + /// Initializes the memory cache by loading the most recently modified files up to the memory limit. + /// + /// First collects metadata for all files, sorts them by modification time, and then loads as many + /// recent files as possible within the memory limit. Files that don't fit in memory remain only + /// on disk. fn load_fs(&mut self) -> Result<()> { if !self.path.exists() { return Err(ArklibError::Storage( @@ -236,14 +241,14 @@ where Ok(()) } - // Retrieves a value from the memory cache. + /// Retrieves a value from the memory cache. fn fetch_from_memory(&mut self, key: &K) -> Option { self.memory_cache .get(key) .map(|entry| entry.value.clone()) } - // Retrieves a value from disk and caches it in memory if possible. + /// Retrieves a value from disk and caches it in memory if possible. fn fetch_from_disk(&mut self, key: &K) -> Option { let file_path = self.path.join(key.to_string()); if !file_path.exists() { @@ -276,7 +281,7 @@ where } } - // Writes a value to disk using atomic operations. + /// Writes a value to disk using atomic operations. fn persist_to_disk(&mut self, key: &K, value: &V) -> Result<()> { log::debug!("cache/{}: writing to disk for key {}", self.label, key); fs::create_dir_all(&self.path)?; @@ -297,7 +302,7 @@ where }) } - // Reads a value from disk. + /// Reads a value from disk. fn read_from_disk(&self, key: &K) -> Result where V: From>, // Add trait bound for reading binary data @@ -307,16 +312,16 @@ where Ok(V::from(contents)) } - // Returns the size of a value in bytes. - // - // First checks the memory cache for size information to avoid disk access. - // Falls back to checking the file size on disk if not found in memory. + /// Returns the size of a value in bytes. + /// + /// First checks the memory cache for size information to avoid disk access. + /// Falls back to checking the file size on disk if not found in memory. fn get_file_size(&self, key: &K) -> Result { Ok(fs::metadata(self.path.join(key.to_string()))?.len() as usize) } - // Adds or updates a value in the memory cache, evicting old entries if needed. - // Returns error if value is larger than maximum memory limit. + /// Adds or updates a value in the memory cache, evicting old entries if needed. + /// Returns error if value is larger than maximum memory limit. fn update_memory_cache(&mut self, key: &K, value: &V) -> Result<()> { log::debug!("cache/{}: caching in memory for key {}", self.label, key); let size = self.get_file_size(key)?;