Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvdb: no overlay #313

Merged
merged 20 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kvdb-memorydb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)

## [0.3.0] - 2019-01-03
- InMemory key-value database now can report memory used (via `MallocSizeOf`). [#292](https://github.com/paritytech/parity-common/pull/292)
Expand Down
5 changes: 1 addition & 4 deletions kvdb-memorydb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl KeyValueDB for InMemory {
}
}

fn write_buffered(&self, transaction: DBTransaction) {
fn write(&self, transaction: DBTransaction) -> io::Result<()> {
let mut columns = self.columns.write();
let ops = transaction.ops;
for op in ops {
Expand All @@ -77,9 +77,6 @@ impl KeyValueDB for InMemory {
}
}
}
}

fn flush(&self) -> io::Result<()> {
Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions kvdb-rocksdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)


## [0.4.0] - 2019-01-03
- Add I/O statistics for RocksDB. [#294](https://github.com/paritytech/parity-common/pull/294)
Expand Down
1 change: 0 additions & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ harness = false
[dependencies]
smallvec = "1.0.0"
fs-swap = "0.2.4"
interleaved-ordered = "0.1.1"
kvdb = { path = "../kvdb", version = "0.3" }
log = "0.4.8"
num_cpus = "1.10.1"
Expand Down
186 changes: 23 additions & 163 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ mod stats;
use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result};

use parity_util_mem::MallocSizeOf;
use parking_lot::{Mutex, MutexGuard, RwLock};
use parking_lot::RwLock;
use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB,
};

use crate::iter::KeyValuePair;
use fs_swap::{swap, swap_nonatomic};
use interleaved_ordered::interleave_ordered;
use kvdb::{DBKey, DBOp, DBTransaction, DBValue, KeyValueDB};
use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB};
use log::{debug, warn};

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -59,12 +58,6 @@ pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
/// The default memory budget in MiB.
pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;

#[derive(MallocSizeOf)]
enum KeyState {
Insert(DBValue),
Delete,
}

/// Compaction profile for the database settings
/// Note, that changing these parameters may trigger
/// the compaction process of RocksDB on startup.
Expand Down Expand Up @@ -276,15 +269,8 @@ pub struct Database {
read_opts: ReadOptions,
#[ignore_malloc_size_of = "insignificant"]
block_opts: BlockBasedOptions,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>,
#[ignore_malloc_size_of = "insignificant"]
stats: stats::RunningDbStats,
// Values currently being flushed. Cleared when `flush` completes.
flushing: RwLock<Vec<HashMap<DBKey, KeyState>>>,
// Prevents concurrent flushes.
// Value indicates if a flush is in progress.
flushing_lock: Mutex<bool>,
}

#[inline]
Expand Down Expand Up @@ -405,9 +391,6 @@ impl Database {
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
overlay: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new(false),
path: path.to_owned(),
read_opts,
write_opts,
Expand All @@ -421,75 +404,6 @@ impl Database {
DBTransaction::new()
}

/// Commit transaction to database.
pub fn write_buffered(&self, tr: DBTransaction) {
let mut overlay = self.overlay.write();
let ops = tr.ops;
for op in ops {
match op {
DBOp::Insert { col, key, value } => overlay[col as usize].insert(key, KeyState::Insert(value)),
DBOp::Delete { col, key } => overlay[col as usize].insert(key, KeyState::Delete),
};
}
}

/// Commit buffered changes to database. Must be called under `flush_lock`
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<'_, bool>) -> io::Result<()> {
match *self.db.read() {
Some(ref cfs) => {
let mut batch = WriteBatch::default();
let mut ops: usize = 0;
let mut bytes: usize = 0;
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in self.flushing.read().iter().enumerate() {
ops += column.len();
for (key, state) in column.iter() {
let cf = cfs.cf(c);
match *state {
KeyState::Delete => {
bytes += key.len();
batch.delete_cf(cf, key).map_err(other_io_err)?
}
KeyState::Insert(ref value) => {
bytes += key.len() + value.len();
batch.put_cf(cf, key, value).map_err(other_io_err)?
}
};
}
}
}

check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?;
self.stats.tally_transactions(1);
self.stats.tally_writes(ops as u64);
self.stats.tally_bytes_written(bytes as u64);

for column in self.flushing.write().iter_mut() {
column.clear();
column.shrink_to_fit();
}
Ok(())
}
None => Err(other_io_err("Database is closed")),
}
}

/// Commit buffered changes to database.
pub fn flush(&self) -> io::Result<()> {
let mut lock = self.flushing_lock.lock();
// If RocksDB batch allocation fails the thread gets terminated and the lock is released.
// The value inside the lock is used to detect that.
if *lock {
// This can only happen if another flushing thread is terminated unexpectedly.
return Err(other_io_err("Database write failure. Running low on memory perhaps?"));
}
*lock = true;
let result = self.write_flushing_with_lock(&mut lock);
*lock = false;
result
}

/// Commit transaction to database.
pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
match *self.db.read() {
Expand All @@ -503,9 +417,6 @@ impl Database {
let mut stats_total_bytes = 0;

for op in ops {
// remove any buffered operation for this key
self.overlay.write()[op.col() as usize].remove(op.key());

let cf = cfs.cf(op.col() as usize);

match op {
Expand All @@ -532,84 +443,55 @@ impl Database {
pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
match *self.db.read() {
Some(ref cfs) => {
self.stats.tally_reads(1);
let guard = self.overlay.read();
let overlay =
guard.get(col as usize).ok_or_else(|| other_io_err("kvdb column index is out of bounds"))?;
match overlay.get(key) {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
let flushing = &self.flushing.read()[col as usize];
match flushing.get(key) {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
let acquired_val = cfs
.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

match acquired_val {
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
_ => {}
};

acquired_val
}
}
}
if cfs.column_names.get(col as usize).is_none() {
return Err(other_io_err("column index is out of bounds"));
}
self.stats.tally_reads(1);
let acquired_val = cfs
ordian marked this conversation as resolved.
Show resolved Hide resolved
.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

match acquired_val {
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
_ => {}
};

acquired_val
}
None => Ok(None),
}
}

/// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values.
// TODO: support prefix seek for unflushed data
/// Get value by partial key. Prefix size should match configured prefix size.
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
self.iter_from_prefix(col, prefix).next().map(|(_, v)| v)
}

/// Get database iterator for flushed data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was probably lying before?
since it was interleaved with actual data?

Copy link
Member Author

@ordian ordian Jan 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partially, I guess
in order to support non-flushed data properly, it would have to take flushing into account (see e.g. get)

/// Get database iterator for the data.
ordian marked this conversation as resolved.
Show resolved Hide resolved
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've probably asked this before, apologies, but why is it bad to close the DB while some thread is iterating over data? Is the assumption that threads iterating over some data must be allowed to complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good question, but I consider it to be part of the #314

pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let overlay_data = {
let overlay = &self.overlay.read()[col as usize];
let mut overlay_data = overlay
.iter()
.filter_map(|(k, v)| match *v {
KeyState::Insert(ref value) => {
Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_boxed_slice()))
}
KeyState::Delete => None,
})
.collect::<Vec<_>>();
overlay_data.sort();
overlay_data
};

let guarded = iter::ReadGuardedIterator::new(read_lock, col, &self.read_opts);
Some(interleave_ordered(overlay_data, guarded))
Some(guarded)
} else {
None
};
optional.into_iter().flat_map(identity)
}

/// Get database iterator from prefix for flushed data.
/// Get database iterator from prefix for the data.
ordian marked this conversation as resolved.
Show resolved Hide resolved
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
fn iter_from_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, col, prefix, &self.read_opts);
Some(interleave_ordered(Vec::new(), guarded))
Some(guarded)
} else {
None
};
Expand All @@ -622,8 +504,6 @@ impl Database {
/// Close the database
fn close(&self) {
*self.db.write() = None;
self.overlay.write().clear();
self.flushing.write().clear();
}

/// Restore the database from a copy at given path.
Expand Down Expand Up @@ -657,8 +537,6 @@ impl Database {
// reopen the database and steal handles into self
let db = Self::open(&self.config, &self.path)?;
*self.db.write() = mem::replace(&mut *db.db.write(), None);
*self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new());
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new());
Ok(())
}

Expand All @@ -673,7 +551,6 @@ impl Database {
}

/// The number of keys in a column (estimated).
/// Does not take into account the unflushed data.
pub fn num_keys(&self, col: u32) -> io::Result<u64> {
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
match *self.db.read() {
Expand Down Expand Up @@ -728,18 +605,10 @@ impl KeyValueDB for Database {
Database::get_by_prefix(self, col, prefix)
}

fn write_buffered(&self, transaction: DBTransaction) {
Database::write_buffered(self, transaction)
}

fn write(&self, transaction: DBTransaction) -> io::Result<()> {
Database::write(self, transaction)
}

fn flush(&self) -> io::Result<()> {
Database::flush(self)
}

fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
let unboxed = Database::iter(self, col);
Box::new(unboxed.into_iter())
Expand Down Expand Up @@ -775,13 +644,6 @@ impl KeyValueDB for Database {
}
}

impl Drop for Database {
fn drop(&mut self) {
// write all buffered changes if we can.
let _ = self.flush();
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -857,8 +719,6 @@ mod tests {
}
db.write(batch).unwrap();

db.flush().unwrap();

{
let db = db.db.read();
db.as_ref().map(|db| {
Expand Down
Loading