Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support etcd as MetaStore backend #681

Merged
merged 6 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ clap = { version = "3", features = ["derive"] }
crc32fast = "1"
dashmap = "5"
either = "1"
etcd-client = "0.8"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
hyper = "0.14"
Expand Down Expand Up @@ -58,6 +59,7 @@ tracing-subscriber = "0.3"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
assert_matches = "1"
rand = "0.8"

[[bin]]
Expand Down
22 changes: 20 additions & 2 deletions rust/meta/src/bin/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ struct Opts {

#[clap(long)]
prometheus_host: Option<String>,

#[clap(long, default_value_t = String::from(""))]
backend: String,

#[clap(long, default_value_t = String::from(""))]
etcd_endpoints: String,
}

/// Configure log targets for all `RisingWave` crates. When new crates are added and TRACE level
Expand Down Expand Up @@ -61,9 +67,21 @@ async fn main() {
let addr = opts.host.parse().unwrap();
let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap());
let backend = match opts.backend.as_str() {
"etcd" => MetaStoreBackend::Etcd {
endpoints: opts
.etcd_endpoints
.split(',')
.map(|x| x.to_string())
.collect(),
},
"mem" | "" => MetaStoreBackend::Mem,
_ => panic!("unknown backend"),
};

info!("Starting meta server at {}", addr);
let (join_handle, _shutdown_send) =
rpc_serve(addr, prometheus_addr, dashboard_addr, MetaStoreBackend::Mem).await;
let (join_handle, _shutdown_send) = rpc_serve(addr, prometheus_addr, dashboard_addr, backend)
.await
.unwrap();
join_handle.await.unwrap();
}
2 changes: 2 additions & 0 deletions rust/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#![feature(binary_heap_drain_sorted)]
#![feature(const_fn_trait_bound)]
#![feature(option_result_contains)]
#![feature(let_chains)]
#![feature(type_alias_impl_trait)]
#![feature(map_first_last)]

mod barrier;
Expand Down
46 changes: 37 additions & 9 deletions rust/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use etcd_client::{Client as EtcdClient, ConnectOptions};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
use risingwave_pb::meta::catalog_service_server::CatalogServiceServer;
use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
Expand All @@ -26,10 +30,11 @@ use crate::rpc::service::epoch_service::EpochServiceImpl;
use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl;
use crate::rpc::service::hummock_service::HummockServiceImpl;
use crate::rpc::service::stream_service::StreamServiceImpl;
use crate::storage::MemStore;
use crate::storage::{EtcdMetaStore, MemStore, MetaStore};
use crate::stream::{FragmentManager, StreamManager};

pub enum MetaStoreBackend {
Etcd { endpoints: Vec<String> },
Mem,
}

Expand All @@ -38,14 +43,37 @@ pub async fn rpc_serve(
prometheus_addr: Option<SocketAddr>,
dashboard_addr: Option<SocketAddr>,
meta_store_backend: MetaStoreBackend,
) -> Result<(JoinHandle<()>, UnboundedSender<()>)> {
Ok(match meta_store_backend {
MetaStoreBackend::Etcd { endpoints } => {
let client = EtcdClient::connect(
endpoints,
Some(
ConnectOptions::default()
.with_keep_alive(Duration::from_secs(3), Duration::from_secs(5)),
),
)
.await
.map_err(|e| RwError::from(InternalError(format!("failed to connect etcd {}", e))))?;
let meta_store_ref = Arc::new(EtcdMetaStore::new(client));
rpc_serve_with_store(addr, prometheus_addr, dashboard_addr, meta_store_ref).await
}
MetaStoreBackend::Mem => {
let meta_store_ref = Arc::new(MemStore::default());
rpc_serve_with_store(addr, prometheus_addr, dashboard_addr, meta_store_ref).await
}
})
}

pub async fn rpc_serve_with_store<S: MetaStore>(
addr: SocketAddr,
prometheus_addr: Option<SocketAddr>,
dashboard_addr: Option<SocketAddr>,
meta_store_ref: Arc<S>,
) -> (JoinHandle<()>, UnboundedSender<()>) {
let listener = TcpListener::bind(addr).await.unwrap();
let meta_store_ref = match meta_store_backend {
MetaStoreBackend::Mem => Arc::new(MemStore::default()),
};
let epoch_generator_ref = Arc::new(MemEpochGenerator::new());
let env =
MetaSrvEnv::<MemStore>::new(meta_store_ref.clone(), epoch_generator_ref.clone()).await;
let env = MetaSrvEnv::<S>::new(meta_store_ref.clone(), epoch_generator_ref.clone()).await;

let fragment_manager = Arc::new(FragmentManager::new(meta_store_ref.clone()).await.unwrap());
let hummock_manager = Arc::new(hummock::HummockManager::new(env.clone()).await.unwrap());
Expand Down Expand Up @@ -105,9 +133,9 @@ pub async fn rpc_serve(

let epoch_srv = EpochServiceImpl::new(epoch_generator_ref.clone());
let heartbeat_srv = HeartbeatServiceImpl::new();
let catalog_srv = CatalogServiceImpl::<MemStore>::new(env.clone(), catalog_manager_ref);
let cluster_srv = ClusterServiceImpl::<MemStore>::new(cluster_manager.clone());
let stream_srv = StreamServiceImpl::<MemStore>::new(
let catalog_srv = CatalogServiceImpl::<S>::new(env.clone(), catalog_manager_ref);
let cluster_srv = ClusterServiceImpl::<S>::new(cluster_manager.clone());
let stream_srv = StreamServiceImpl::<S>::new(
stream_manager_ref,
fragment_manager.clone(),
cluster_manager,
Expand Down
216 changes: 216 additions & 0 deletions rust/meta/src/storage/etcd_meta_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
use std::sync::atomic::{self, AtomicI64};

use anyhow;
use async_trait::async_trait;
use etcd_client::{
Client, Compare, CompareOp, Error as EtcdError, GetOptions, KvClient, Txn, TxnOp,
};
use futures::Future;
use tokio::sync::Mutex;

use super::{Error, Key, MetaStore, Result, Snapshot, Transaction, Value};

impl From<EtcdError> for Error {
fn from(err: EtcdError) -> Self {
Error::Internal(anyhow::Error::new(err))
}
}

const REVISION_UNINITIALIZED: i64 = -1;

#[derive(Clone)]
pub struct EtcdMetaStore {
client: Client,
}
pub struct EtcdSnapshot {
client: KvClient,
revision: AtomicI64,
init_lock: Mutex<()>,
}

// TODO: we can refine the key encoding before release.
fn encode_etcd_key(cf: &str, key: &[u8]) -> Vec<u8> {
let mut encoded_key = Vec::with_capacity(key.len() + cf.len() + 1);
encoded_key.extend_from_slice(cf.as_bytes());
encoded_key.push(b'/');
encoded_key.extend_from_slice(key);
encoded_key
}

impl EtcdSnapshot {
async fn view_inner<V: SnapshotViewer>(&self, view: V) -> Result<V::Output> {
loop {
let revision = self.revision.load(atomic::Ordering::Relaxed);
if revision != REVISION_UNINITIALIZED {
// Fast and likely path.
let (_, output) = view.view(self.client.clone(), revision).await?;
return Ok(output);
} else {
// Slow path
let _g = self.init_lock.lock().await;
let revision = self.revision.load(atomic::Ordering::Acquire);
if revision != REVISION_UNINITIALIZED {
// Double check failed, release the lock.
continue;
}
let (new_revision, output) = view.view(self.client.clone(), revision).await?;
self.revision.store(new_revision, atomic::Ordering::Release);
return Ok(output);
}
}
}
}

trait SnapshotViewer {
type Output;
type OutputFuture<'a>: Future<Output = Result<(i64, Self::Output)>> + 'a
where
Self: 'a;

fn view(&self, client: KvClient, revision: i64) -> Self::OutputFuture<'_>;
}

struct GetViewer {
key: Vec<u8>,
}

impl SnapshotViewer for GetViewer {
type Output = Vec<u8>;
type OutputFuture<'a> = impl Future<Output = Result<(i64, Self::Output)>> + 'a;

fn view(&self, mut client: KvClient, revision: i64) -> Self::OutputFuture<'_> {
async move {
let res = client
.get(
self.key.clone(),
Some(GetOptions::default().with_revision(revision)),
)
.await?;
let new_revision = if let Some(header) = res.header() {
header.revision()
} else {
return Err(Error::Internal(anyhow::anyhow!(
"Etcd response missing header"
)));
};
let value = res
.kvs()
.first()
.map(|kv| kv.value().to_vec())
.ok_or_else(|| Error::ItemNotFound(hex::encode(self.key.clone())))?;
Ok((new_revision, value))
}
}
}

struct ListViewer {
key: Vec<u8>,
}

impl SnapshotViewer for ListViewer {
type Output = Vec<Vec<u8>>;
type OutputFuture<'a> = impl Future<Output = Result<(i64, Self::Output)>> + 'a;

fn view(&self, mut client: KvClient, revision: i64) -> Self::OutputFuture<'_> {
async move {
let res = client
.get(
self.key.clone(),
Some(GetOptions::default().with_revision(revision).with_prefix()),
)
.await?;
let new_revision = if let Some(header) = res.header() {
header.revision()
} else {
return Err(Error::Internal(anyhow::anyhow!(
"Etcd response missing header"
)));
};
let value = res.kvs().iter().map(|kv| kv.value().to_vec()).collect();
Ok((new_revision, value))
}
}
}

#[async_trait]
impl Snapshot for EtcdSnapshot {
async fn list_cf(&self, cf: &str) -> Result<Vec<Vec<u8>>> {
let view = ListViewer {
key: encode_etcd_key(cf, &[]),
};
self.view_inner(view).await
}

async fn get_cf(&self, cf: &str, key: &[u8]) -> Result<Vec<u8>> {
let view = GetViewer {
key: encode_etcd_key(cf, key),
};
self.view_inner(view).await
}
}

impl EtcdMetaStore {
pub fn new(client: Client) -> Self {
Self { client }
}
}

#[async_trait]
impl MetaStore for EtcdMetaStore {
type Snapshot = EtcdSnapshot;

fn snapshot(&self) -> Self::Snapshot {
EtcdSnapshot {
client: self.client.kv_client(),
revision: AtomicI64::new(REVISION_UNINITIALIZED),
init_lock: Default::default(),
}
}

async fn put_cf(&self, cf: &str, key: Key, value: Value) -> Result<()> {
self.client
.kv_client()
.put(encode_etcd_key(cf, &key), value, None)
.await?;
Ok(())
}

async fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> {
self.client
.kv_client()
.delete(encode_etcd_key(cf, key), None)
.await?;
Ok(())
}

async fn txn(&self, trx: Transaction) -> Result<()> {
let (preconditions, operations) = trx.into_parts();
let when = preconditions
.into_iter()
.map(|cond| match cond {
super::Precondition::KeyExists { cf, key } => {
Compare::value(encode_etcd_key(&cf, &key), CompareOp::Equal, vec![])
}
})
.collect::<Vec<_>>();

let then = operations
.into_iter()
.map(|op| match op {
super::Operation::Put { cf, key, value } => {
let key = encode_etcd_key(&cf, &key);
let value = value.to_vec();
TxnOp::put(key, value, None)
}
super::Operation::Delete { cf, key } => {
let key = encode_etcd_key(&cf, &key);
TxnOp::delete(key, None)
}
})
.collect::<Vec<_>>();

let etcd_txn = Txn::new().when(when).and_then(then);
self.client.kv_client().txn(etcd_txn).await?;
Ok(())
}
}
Loading