Skip to content

Commit

Permalink
fix(meta): validate cluster membership in hummock RPC (#682)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Mar 3, 2022
1 parent 9ddfe63 commit 8b8ff66
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 25 deletions.
28 changes: 22 additions & 6 deletions rust/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use itertools::Itertools;
use prost::Message;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::hummock::hummock_version::HummockVersionRefId;
use risingwave_pb::hummock::{
Expand All @@ -19,8 +20,8 @@ use crate::hummock::model::{
CurrentHummockVersionId, HummockContextPinnedSnapshotExt, HummockContextPinnedVersionExt,
};
use crate::manager::{IdCategory, IdGeneratorManagerRef, MetaSrvEnv};
use crate::model::{MetadataModel, Transactional};
use crate::storage::{MetaStore, Transaction};
use crate::model::{MetadataModel, Transactional, Worker};
use crate::storage::{Error, MetaStore, Transaction};

pub struct HummockManager<S> {
id_gen_manager_ref: IdGeneratorManagerRef<S>,
Expand Down Expand Up @@ -104,20 +105,35 @@ where
};
init_version.upsert_in_transaction(&mut transaction)?;

// TODO #93: Cancel all compact_tasks
// TODO #546: Cancel all compact_tasks

self.commit_trx(compaction_guard.meta_store_ref.as_ref(), transaction, None)
.await
}

/// We use worker node id as the `context_id`.
/// If the `context_id` is provided, the transaction will abort if the `context_id` is not
/// valid, which means the worker node is not a valid member of the cluster.
async fn commit_trx(
&self,
meta_store_ref: &S,
trx: Transaction,
mut trx: Transaction,
context_id: Option<HummockContextId>,
) -> Result<()> {
if let Some(_context_id) = context_id {
// TODO check context validity
if let Some(context_id) = context_id {
// Get the worker's key in meta store
let workers = Worker::list(meta_store_ref)
.await?
.into_iter()
.filter(|worker| worker.worker_id() == context_id)
.collect_vec();
assert!(workers.len() <= 1);
if let Some(worker) = workers.first() {
trx.check_exists(Worker::cf_name(), worker.key()?.encode_to_vec());
} else {
// The worker is not found in cluster.
return Err(Error::TransactionAbort().into());
}
}
meta_store_ref.txn(trx).await.map_err(Into::into)
}
Expand Down
114 changes: 95 additions & 19 deletions rust/meta/src/hummock/hummock_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,54 @@ use std::cmp::Ordering;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::error::Result;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
use risingwave_pb::hummock::{
HummockContextPinnedSnapshot, HummockContextPinnedVersion, HummockSnapshot, SstableInfo,
};
use risingwave_storage::hummock::{FIRST_VERSION_ID, INVALID_EPOCH};
use risingwave_storage::hummock::{HummockContextId, FIRST_VERSION_ID, INVALID_EPOCH};

use crate::cluster::StoredClusterManager;
use crate::hummock::test_utils::*;
use crate::hummock::HummockManager;
use crate::manager::MetaSrvEnv;
use crate::manager::{MetaSrvEnv, NotificationManager};
use crate::model::MetadataModel;
use crate::storage::MemStore;

async fn setup_compute_env(
port: i32,
) -> (
MetaSrvEnv<MemStore>,
Arc<HummockManager<MemStore>>,
Arc<StoredClusterManager<MemStore>>,
WorkerNode,
) {
let env = MetaSrvEnv::for_test().await;
let hummock_manager = Arc::new(HummockManager::new(env.clone()).await.unwrap());
let cluster_manager = Arc::new(
StoredClusterManager::new(
env.clone(),
Some(hummock_manager.clone()),
Arc::new(NotificationManager::new()),
)
.await
.unwrap(),
);
let fake_host_address = HostAddress {
host: "127.0.0.1".to_string(),
port,
};
let (worker_node, _) = cluster_manager
.add_worker_node(fake_host_address, WorkerType::ComputeNode)
.await
.unwrap();
(env, hummock_manager, cluster_manager, worker_node)
}

#[tokio::test]
async fn test_hummock_pin_unpin() -> Result<()> {
let env = MetaSrvEnv::for_test().await;
let hummock_manager = HummockManager::new(env.clone()).await?;
let context_id = 0;
let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await;
let context_id = worker_node.id;
let version_id = FIRST_VERSION_ID;
let epoch = INVALID_EPOCH;

Expand Down Expand Up @@ -77,9 +109,8 @@ async fn test_hummock_pin_unpin() -> Result<()> {

#[tokio::test]
async fn test_hummock_get_compact_task() -> Result<()> {
let env = MetaSrvEnv::for_test().await;
let hummock_manager = HummockManager::new(env.clone()).await?;
let context_id = 0;
let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await;
let context_id = worker_node.id;

let task = hummock_manager.get_compact_task().await?;
assert_eq!(task, None);
Expand Down Expand Up @@ -110,9 +141,8 @@ async fn test_hummock_get_compact_task() -> Result<()> {

#[tokio::test]
async fn test_hummock_table() -> Result<()> {
let env = MetaSrvEnv::for_test().await;
let hummock_manager = HummockManager::new(env.clone()).await?;
let context_id = 0;
let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await;
let context_id = worker_node.id;

let epoch: u64 = 1;
let mut table_id = 1;
Expand Down Expand Up @@ -154,9 +184,8 @@ async fn test_hummock_table() -> Result<()> {

#[tokio::test]
async fn test_hummock_transaction() -> Result<()> {
let env = MetaSrvEnv::for_test().await;
let hummock_manager = HummockManager::new(env.clone()).await?;
let context_id = 0;
let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await;
let context_id = worker_node.id;
let mut table_id = 1;
let mut committed_tables = vec![];

Expand Down Expand Up @@ -348,10 +377,19 @@ async fn test_hummock_transaction() -> Result<()> {

#[tokio::test]
async fn test_release_context_resource() -> Result<()> {
let env = MetaSrvEnv::for_test().await;
let hummock_manager = Arc::new(HummockManager::new(env.clone()).await?);
let context_id_1 = 1;
let context_id_2 = 2;
let (env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(1).await;
let context_id_1 = worker_node.id;

let fake_host_address_2 = HostAddress {
host: "127.0.0.1".to_string(),
port: 2,
};
let (worker_node_2, _) = cluster_manager
.add_worker_node(fake_host_address_2, WorkerType::ComputeNode)
.await
.unwrap();
let context_id_2 = worker_node_2.id;

assert_eq!(
HummockContextPinnedVersion::list(&*env.meta_store_ref())
.await
Expand Down Expand Up @@ -423,3 +461,41 @@ async fn test_release_context_resource() -> Result<()> {
);
Ok(())
}

#[tokio::test]
async fn test_context_id_validation() {
let (_env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(80).await;
let invalid_context_id = HummockContextId::MAX;
let context_id = worker_node.id;
let epoch: u64 = 1;
let mut table_id = 1;
let original_tables = generate_test_tables(epoch, &mut table_id);

// Invalid context id is rejected.
let error = hummock_manager
.add_tables(invalid_context_id, original_tables.clone(), epoch)
.await
.unwrap_err();
assert!(matches!(error.inner(), ErrorCode::InternalError(_)));
assert_eq!(error.to_string(), "internal error: transaction aborted");

// Valid context id is accepted.
hummock_manager
.add_tables(context_id, original_tables.clone(), epoch)
.await
.unwrap();

hummock_manager.pin_version(context_id).await.unwrap();
// Pin multiple times is OK.
hummock_manager.pin_version(context_id).await.unwrap();

// Remove the node from cluster will invalidate context id.
cluster_manager
.delete_worker_node(worker_node.host.unwrap())
.await
.unwrap();
// Invalid context id is rejected.
let error = hummock_manager.pin_version(context_id).await.unwrap_err();
assert!(matches!(error.inner(), ErrorCode::InternalError(_)));
assert_eq!(error.to_string(), "internal error: transaction aborted");
}
4 changes: 4 additions & 0 deletions rust/meta/src/model/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ impl Worker {
pub fn worker_node(&self) -> WorkerNode {
self.0.clone()
}

pub fn worker_id(&self) -> u32 {
self.0.id
}
}

0 comments on commit 8b8ff66

Please sign in to comment.