Skip to content

Commit

Permalink
feat(cache): introduce tiered cache abstraction (risingwavelabs#4406)
Browse files Browse the repository at this point in the history
* feat(cache): introduce tiered cache abstraction

* refactor write batch design

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent 87f7fb1 commit 29fd1ad
Show file tree
Hide file tree
Showing 11 changed files with 520 additions and 187 deletions.
32 changes: 26 additions & 6 deletions src/bench/file_cache_bench/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use clap::Parser;
use itertools::Itertools;
use rand::{Rng, SeedableRng};
use risingwave_storage::hummock::file_cache::cache::{FileCache, FileCacheOptions};
use risingwave_storage::hummock::file_cache::coding::CacheKey;
use risingwave_storage::hummock::{TieredCacheKey, TieredCacheValue};
use tokio::sync::oneshot;

use crate::analyze::{analyze, monitor, Hook, Metrics};
Expand Down Expand Up @@ -63,7 +63,7 @@ struct Index {
idx: u32,
}

impl CacheKey for Index {
impl TieredCacheKey for Index {
fn encoded_len() -> usize {
8
}
Expand Down Expand Up @@ -91,11 +91,10 @@ pub async fn run() {
capacity: args.capacity,
total_buffer_capacity: args.total_buffer_capacity,
cache_file_fallocate_unit: args.cache_file_fallocate_unit,
filters: vec![],
flush_buffer_hooks: vec![hook],
};

let cache: FileCache<Index> = FileCache::open(options).await.unwrap();
let cache: FileCache<Index, CacheValue> = FileCache::open(options).await.unwrap();

let iostat_path = dev_stat_path(&args.path);

Expand Down Expand Up @@ -166,10 +165,31 @@ pub async fn run() {
tokio::time::sleep(Duration::from_millis(300)).await;
}

#[derive(PartialEq, Eq, Clone, Debug)]
struct CacheValue(Vec<u8>);

impl TieredCacheValue for CacheValue {
fn len(&self) -> usize {
self.0.len()
}

fn encoded_len(&self) -> usize {
self.len()
}

fn encode(&self, mut buf: &mut [u8]) {
buf.put_slice(&self.0)
}

fn decode(buf: Vec<u8>) -> Self {
Self(buf)
}
}

async fn bench(
id: usize,
args: Args,
cache: FileCache<Index>,
cache: FileCache<Index, CacheValue>,
time: u64,
metrics: Metrics,
mut stop: oneshot::Receiver<()>,
Expand All @@ -195,7 +215,7 @@ async fn bench(
for _ in 0..args.write {
idx += 1;
let key = Index { sst, idx };
let value = vec![b'x'; args.bs];
let value = CacheValue(vec![b'x'; args.bs]);

let start = Instant::now();
cache.insert(key, value).unwrap();
Expand Down
58 changes: 37 additions & 21 deletions src/storage/src/hummock/file_cache/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,57 @@ use std::sync::Arc;
use parking_lot::RwLock;
use risingwave_common::cache::LruCache;

use super::coding::CacheKey;
use super::LRU_SHARD_BITS;
use crate::hummock::{TieredCacheEntryHolder, TieredCacheKey, TieredCacheValue};

pub type Buffer<K> = Arc<LruCache<K, Vec<u8>>>;
pub type Buffer<K, V> = Arc<LruCache<K, V>>;

struct TwoLevelBufferCore<K>
struct TwoLevelBufferCore<K, V>
where
K: CacheKey,
K: TieredCacheKey,
V: TieredCacheValue,
{
active_buffer: Buffer<K>,
frozen_buffer: Buffer<K>,
active_buffer: Buffer<K, V>,
frozen_buffer: Buffer<K, V>,
}

impl<K> TwoLevelBufferCore<K>
impl<K, V> TwoLevelBufferCore<K, V>
where
K: CacheKey,
K: TieredCacheKey,
V: TieredCacheValue,
{
fn swap(&mut self) {
// Swap fields of `&mut self` to avoid the borrow checker complaining.
std::mem::swap(&mut self.active_buffer, &mut self.frozen_buffer);
}
}

#[derive(Clone)]
pub struct TwoLevelBuffer<K>
pub struct TwoLevelBuffer<K, V>
where
K: CacheKey,
K: TieredCacheKey,
V: TieredCacheValue,
{
capacity: usize,
core: Arc<RwLock<TwoLevelBufferCore<K>>>,
core: Arc<RwLock<TwoLevelBufferCore<K, V>>>,
}

impl<K> TwoLevelBuffer<K>
impl<K, V> Clone for TwoLevelBuffer<K, V>
where
K: CacheKey,
K: TieredCacheKey,
V: TieredCacheValue,
{
fn clone(&self) -> Self {
Self {
capacity: self.capacity,
core: Arc::clone(&self.core),
}
}
}

impl<K, V> TwoLevelBuffer<K, V>
where
K: TieredCacheKey,
V: TieredCacheValue,
{
pub fn new(capacity: usize) -> Self {
Self {
Expand All @@ -63,18 +79,18 @@ where
}
}

pub fn insert(&self, hash: u64, key: K, charge: usize, value: Vec<u8>) {
pub fn insert(&self, hash: u64, key: K, charge: usize, value: V) {
let core = self.core.read();
core.active_buffer.insert(key, hash, charge, value);
}

pub fn get(&self, hash: u64, key: &K) -> Option<Vec<u8>> {
pub fn get(&self, hash: u64, key: &K) -> Option<TieredCacheEntryHolder<K, V>> {
let core = self.core.read();
if let Some(entry) = core.active_buffer.lookup(hash, key) {
return Some(entry.value().clone());
return Some(TieredCacheEntryHolder::from_cached_value(entry));
}
if let Some(entry) = core.frozen_buffer.lookup(hash, key) {
return Some(entry.value().clone());
return Some(TieredCacheEntryHolder::from_cached_value(entry));
}
None
}
Expand All @@ -85,19 +101,19 @@ where
core.frozen_buffer.erase(hash, key);
}

pub fn active(&self) -> Buffer<K> {
pub fn active(&self) -> Buffer<K, V> {
self.core.read().active_buffer.clone()
}

pub fn frozen(&self) -> Buffer<K> {
pub fn frozen(&self) -> Buffer<K, V> {
self.core.read().frozen_buffer.clone()
}

pub fn swap(&self) {
self.core.write().swap();
}

pub fn rotate(&self) -> Buffer<K> {
pub fn rotate(&self) -> Buffer<K, V> {
let mut buffer = Arc::new(LruCache::new(LRU_SHARD_BITS, self.capacity));
let mut core = self.core.write();
std::mem::swap(&mut buffer, &mut core.active_buffer);
Expand Down
Loading

0 comments on commit 29fd1ad

Please sign in to comment.