Skip to content

Commit

Permalink
feat(storage): support replicate batch (aka remote write) (#718)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored Mar 8, 2022
1 parent 33d427e commit 81c9893
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 69 deletions.
6 changes: 5 additions & 1 deletion rust/meta/src/hummock/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::iter::once;
use std::sync::Arc;

use bytes::Bytes;
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::hummock::checksum::Algorithm as ChecksumAlg;
use risingwave_storage::hummock::compactor::{Compactor, SubCompactContext};
Expand Down Expand Up @@ -90,7 +91,10 @@ async fn test_compaction_same_key_not_split() {
for _ in 0..kv_count {
storage
.write_batch(
once((b"same_key".to_vec(), HummockValue::Put(b"value".to_vec()))),
once((
Bytes::from(&b"same_key"[..]),
HummockValue::Put(Bytes::from(&b"value"[..])),
)),
epoch,
)
.await
Expand Down
33 changes: 30 additions & 3 deletions rust/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt;
use std::ops::RangeBounds;
use std::sync::Arc;

use bytes::Bytes;
use itertools::Itertools;

mod block_cache;
Expand Down Expand Up @@ -188,7 +189,7 @@ impl HummockStorage {
.shared_buffer_manager
.get(key, (version.max_committed_epoch() + 1)..=epoch)
{
return Ok(v.into_put_value());
return Ok(v.into_put_value().map(|x| x.to_vec()));
}

for level in &version.levels() {
Expand Down Expand Up @@ -362,18 +363,44 @@ impl HummockStorage {
/// been committed. If such case happens, the outcome is non-predictable.
pub async fn write_batch(
&self,
kv_pairs: impl Iterator<Item = (Vec<u8>, HummockValue<Vec<u8>>)>,
kv_pairs: impl Iterator<Item = (Bytes, HummockValue<Bytes>)>,
epoch: u64,
) -> HummockResult<()> {
let batch = kv_pairs
.map(|i| (FullKey::from_user_key(i.0, epoch).into_inner(), i.1))
.map(|i| {
(
Bytes::from(FullKey::from_user_key(i.0.to_vec(), epoch).into_inner()),
i.1,
)
})
.collect_vec();
self.shared_buffer_manager.write_batch(batch, epoch)?;

// self.sync(epoch).await?;
Ok(())
}

/// Replicate batch to shared buffer, without uploading to the storage backend.
pub async fn replicate_batch(
&self,
kv_pairs: impl Iterator<Item = (Bytes, HummockValue<Bytes>)>,
epoch: u64,
) -> HummockResult<()> {
let batch = kv_pairs
.map(|i| {
(
Bytes::from(FullKey::from_user_key(i.0.to_vec(), epoch).into_inner()),
i.1,
)
})
.collect_vec();
self.shared_buffer_manager
.replicate_remote_batch(batch, epoch)?;

// self.sync(epoch).await?;
Ok(())
}

pub async fn sync(&self, epoch: Option<u64>) -> HummockResult<()> {
self.shared_buffer_manager.sync(epoch).await
}
Expand Down
70 changes: 53 additions & 17 deletions rust/storage/src/hummock/shared_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::ops::RangeBounds;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::RwLock as PLRwLock;
use risingwave_common::error::Result;
Expand All @@ -19,7 +20,7 @@ use super::value::HummockValue;
use super::{key, HummockError, HummockOptions, HummockResult, SstableStoreRef};
use crate::monitor::StateStoreStats;

type SharedBufferItem = (Vec<u8>, HummockValue<Vec<u8>>);
type SharedBufferItem = (Bytes, HummockValue<Bytes>);

/// A write batch stored in the shared buffer.
#[derive(Clone, Debug)]
Expand All @@ -42,9 +43,9 @@ impl SharedBufferBatch {
// user key.
match self
.inner
.binary_search_by(|m| key::user_key(m.0.as_slice()).cmp(user_key))
.binary_search_by(|m| key::user_key(&m.0).cmp(user_key))
{
Ok(i) => Some(self.inner[i].1.clone()),
Ok(i) => Some(self.inner[i].1.to_vec()),
Err(_) => None,
}
}
Expand All @@ -58,19 +59,19 @@ impl SharedBufferBatch {
}

pub fn start_key(&self) -> &[u8] {
self.inner.first().unwrap().0.as_slice()
&self.inner.first().unwrap().0
}

pub fn end_key(&self) -> &[u8] {
self.inner.last().unwrap().0.as_slice()
&self.inner.last().unwrap().0
}

pub fn start_user_key(&self) -> &[u8] {
key::user_key(self.inner.first().unwrap().0.as_slice())
key::user_key(&self.inner.first().unwrap().0)
}

pub fn end_user_key(&self) -> &[u8] {
key::user_key(self.inner.last().unwrap().0.as_slice())
key::user_key(&self.inner.last().unwrap().0)
}

pub fn epoch(&self) -> u64 {
Expand Down Expand Up @@ -111,7 +112,7 @@ impl<const DIRECTION: usize> HummockIterator for SharedBufferBatchIterator<DIREC
}

fn key(&self) -> &[u8] {
self.current_item().0.as_slice()
&self.current_item().0
}

fn value(&self) -> HummockValue<&[u8]> {
Expand All @@ -132,7 +133,7 @@ impl<const DIRECTION: usize> HummockIterator for SharedBufferBatchIterator<DIREC
// user key.
match self
.inner
.binary_search_by(|probe| key::user_key(probe.0.as_slice()).cmp(key::user_key(key)))
.binary_search_by(|probe| key::user_key(&probe.0).cmp(key::user_key(key)))
{
Ok(i) => {
self.current_idx = i;
Expand Down Expand Up @@ -195,6 +196,21 @@ impl SharedBufferManager {
.map_err(HummockError::shared_buffer_error)
}

/// Put a write batch into shared buffer. The batch will won't be synced to S3 asynchronously.
pub fn replicate_remote_batch(
&self,
batch: Vec<SharedBufferItem>,
epoch: u64,
) -> HummockResult<()> {
let batch = SharedBufferBatch::new(batch, epoch);
self.shared_buffer
.write()
.entry(epoch)
.or_insert(BTreeMap::new())
.insert(batch.end_user_key().to_vec(), batch.clone());
Ok(())
}

// TODO: support time-based syncing
pub async fn sync(&self, epoch: Option<u64>) -> HummockResult<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -470,6 +486,9 @@ impl SharedBufferUploader {
mod tests {
use std::sync::Arc;

use bytes::Bytes;
use itertools::Itertools;

use super::SharedBufferBatch;
use crate::hummock::iterator::test_utils::{
iterator_test_key_of, iterator_test_key_of_epoch, test_value_of,
Expand All @@ -486,6 +505,15 @@ mod tests {
use crate::monitor::DEFAULT_STATE_STORE_STATS;
use crate::object::{InMemObjectStore, ObjectStore};

fn transform_shared_buffer(
batches: Vec<(Vec<u8>, HummockValue<Vec<u8>>)>,
) -> Vec<(Bytes, HummockValue<Bytes>)> {
batches
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect_vec()
}

#[tokio::test]
async fn test_shared_buffer_batch_basic() {
let epoch = 1;
Expand All @@ -503,18 +531,19 @@ mod tests {
HummockValue::Put(b"value3".to_vec()),
),
];
let shared_buffer_batch = SharedBufferBatch::new(shared_buffer_items.clone(), epoch);
let shared_buffer_batch =
SharedBufferBatch::new(transform_shared_buffer(shared_buffer_items.clone()), epoch);

// Sketch
assert_eq!(shared_buffer_batch.start_key(), shared_buffer_items[0].0);
assert_eq!(shared_buffer_batch.end_key(), shared_buffer_items[2].0);
assert_eq!(
shared_buffer_batch.start_user_key(),
user_key(shared_buffer_items[0].0.as_slice())
user_key(&shared_buffer_items[0].0)
);
assert_eq!(
shared_buffer_batch.end_user_key(),
user_key(shared_buffer_items[2].0.as_slice())
user_key(&shared_buffer_items[2].0)
);

// Point lookup
Expand Down Expand Up @@ -575,7 +604,8 @@ mod tests {
HummockValue::Put(b"value3".to_vec()),
),
];
let shared_buffer_batch = SharedBufferBatch::new(shared_buffer_items.clone(), epoch);
let shared_buffer_batch =
SharedBufferBatch::new(transform_shared_buffer(shared_buffer_items.clone()), epoch);

// Seek to 2nd key with current epoch, expect last two items to return
let mut iter = shared_buffer_batch.iter();
Expand Down Expand Up @@ -643,19 +673,25 @@ mod tests {
let mut shared_buffer_items = Vec::new();
for key in put_keys {
shared_buffer_items.push((
key_with_epoch(key.clone(), epoch),
HummockValue::Put(test_value_of(0, *idx)),
Bytes::from(key_with_epoch(key.clone(), epoch)),
HummockValue::Put(test_value_of(0, *idx).into()),
));
*idx += 1;
}
for key in delete_keys {
shared_buffer_items.push((key_with_epoch(key.clone(), epoch), HummockValue::Delete));
shared_buffer_items.push((
Bytes::from(key_with_epoch(key.clone(), epoch)),
HummockValue::Delete,
));
}
shared_buffer_items.sort_by(|l, r| user_key(l.0.as_slice()).cmp(r.0.as_slice()));
shared_buffer_items.sort_by(|l, r| user_key(&l.0).cmp(&r.0));
shared_buffer_manager
.write_batch(shared_buffer_items.clone(), epoch)
.unwrap();
shared_buffer_items
.iter()
.map(|(k, v)| (k.to_vec(), v.to_vec()))
.collect_vec()
}

#[tokio::test]
Expand Down
19 changes: 12 additions & 7 deletions rust/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,8 @@ impl StateStore for HummockStateStore {

async fn ingest_batch(&self, kv_pairs: Vec<(Bytes, Option<Bytes>)>, epoch: u64) -> Result<()> {
self.storage
.write_batch(
kv_pairs
.into_iter()
.map(|(k, v)| (k.to_vec(), v.map(|x| x.to_vec()).into())),
epoch,
)
.write_batch(kv_pairs.into_iter().map(|(k, v)| (k, v.into())), epoch)
.await?;

Ok(())
}

Expand Down Expand Up @@ -82,6 +76,17 @@ impl StateStore for HummockStateStore {
self.storage.sync(epoch).await?;
Ok(())
}

async fn replicate_batch(
&self,
kv_pairs: Vec<(Bytes, Option<Bytes>)>,
epoch: u64,
) -> Result<()> {
self.storage
.replicate_batch(kv_pairs.into_iter().map(|(k, v)| (k, v.into())), epoch)
.await?;
Ok(())
}
}

pub struct HummockStateStoreIter<'a> {
Expand Down
35 changes: 5 additions & 30 deletions rust/storage/src/hummock/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ async fn test_basic() {

// Write first batch.
hummock_storage
.write_batch(
batch1
.into_iter()
.map(|(k, v)| (k.to_vec(), v.map(|x| x.to_vec()).into())),
epoch1,
)
.write_batch(batch1.into_iter().map(|(k, v)| (k, v.into())), epoch1)
.await
.unwrap();

Expand All @@ -140,12 +135,7 @@ async fn test_basic() {
// Write second batch.
let epoch2 = epoch1 + 1;
hummock_storage
.write_batch(
batch2
.into_iter()
.map(|(k, v)| (k.to_vec(), v.map(|x| x.to_vec()).into())),
epoch2,
)
.write_batch(batch2.into_iter().map(|(k, v)| (k, v.into())), epoch2)
.await
.unwrap();

Expand All @@ -156,12 +146,7 @@ async fn test_basic() {
// Write third batch.
let epoch3 = epoch2 + 1;
hummock_storage
.write_batch(
batch3
.into_iter()
.map(|(k, v)| (k.to_vec(), v.map(|x| x.to_vec()).into())),
epoch3,
)
.write_batch(batch3.into_iter().map(|(k, v)| (k, v.into())), epoch3)
.await
.unwrap();

Expand Down Expand Up @@ -265,12 +250,7 @@ async fn test_reload_storage() {

// Write first batch.
hummock_storage
.write_batch(
batch1
.into_iter()
.map(|(k, v)| (k.to_vec(), v.map(|x| x.to_vec()).into())),
epoch1,
)
.write_batch(batch1.into_iter().map(|(k, v)| (k, v.into())), epoch1)
.await
.unwrap();

Expand Down Expand Up @@ -299,12 +279,7 @@ async fn test_reload_storage() {
// Write second batch.
let epoch2 = epoch1 + 1;
hummock_storage
.write_batch(
batch2
.into_iter()
.map(|(k, v)| (k.to_vec(), v.map(|x| x.to_vec()).into())),
epoch2,
)
.write_batch(batch2.into_iter().map(|(k, v)| (k, v.into())), epoch2)
.await
.unwrap();

Expand Down
Loading

0 comments on commit 81c9893

Please sign in to comment.