From 8b8ff66b1c4eda038329bf764e1d267eb905a74d Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 3 Mar 2022 17:01:22 +0800 Subject: [PATCH] fix(meta): validate cluster membership in hummock RPC (#682) --- rust/meta/src/hummock/hummock_manager.rs | 28 ++++- .../meta/src/hummock/hummock_manager_tests.rs | 114 +++++++++++++++--- rust/meta/src/model/cluster.rs | 4 + 3 files changed, 121 insertions(+), 25 deletions(-) diff --git a/rust/meta/src/hummock/hummock_manager.rs b/rust/meta/src/hummock/hummock_manager.rs index eb498eb915261..32166d79f2012 100644 --- a/rust/meta/src/hummock/hummock_manager.rs +++ b/rust/meta/src/hummock/hummock_manager.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use itertools::Itertools; +use prost::Message; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::hummock::hummock_version::HummockVersionRefId; use risingwave_pb::hummock::{ @@ -19,8 +20,8 @@ use crate::hummock::model::{ CurrentHummockVersionId, HummockContextPinnedSnapshotExt, HummockContextPinnedVersionExt, }; use crate::manager::{IdCategory, IdGeneratorManagerRef, MetaSrvEnv}; -use crate::model::{MetadataModel, Transactional}; -use crate::storage::{MetaStore, Transaction}; +use crate::model::{MetadataModel, Transactional, Worker}; +use crate::storage::{Error, MetaStore, Transaction}; pub struct HummockManager { id_gen_manager_ref: IdGeneratorManagerRef, @@ -104,20 +105,35 @@ where }; init_version.upsert_in_transaction(&mut transaction)?; - // TODO #93: Cancel all compact_tasks + // TODO #546: Cancel all compact_tasks self.commit_trx(compaction_guard.meta_store_ref.as_ref(), transaction, None) .await } + /// We use worker node id as the `context_id`. + /// If the `context_id` is provided, the transaction will abort if the `context_id` is not + /// valid, which means the worker node is not a valid member of the cluster. async fn commit_trx( &self, meta_store_ref: &S, - trx: Transaction, + mut trx: Transaction, context_id: Option, ) -> Result<()> { - if let Some(_context_id) = context_id { - // TODO check context validity + if let Some(context_id) = context_id { + // Get the worker's key in meta store + let workers = Worker::list(meta_store_ref) + .await? + .into_iter() + .filter(|worker| worker.worker_id() == context_id) + .collect_vec(); + assert!(workers.len() <= 1); + if let Some(worker) = workers.first() { + trx.check_exists(Worker::cf_name(), worker.key()?.encode_to_vec()); + } else { + // The worker is not found in cluster. + return Err(Error::TransactionAbort().into()); + } } meta_store_ref.txn(trx).await.map_err(Into::into) } diff --git a/rust/meta/src/hummock/hummock_manager_tests.rs b/rust/meta/src/hummock/hummock_manager_tests.rs index 6202cd79c0a0a..540bca6689e32 100644 --- a/rust/meta/src/hummock/hummock_manager_tests.rs +++ b/rust/meta/src/hummock/hummock_manager_tests.rs @@ -2,22 +2,54 @@ use std::cmp::Ordering; use std::sync::Arc; use itertools::Itertools; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result}; +use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; use risingwave_pb::hummock::{ HummockContextPinnedSnapshot, HummockContextPinnedVersion, HummockSnapshot, SstableInfo, }; -use risingwave_storage::hummock::{FIRST_VERSION_ID, INVALID_EPOCH}; +use risingwave_storage::hummock::{HummockContextId, FIRST_VERSION_ID, INVALID_EPOCH}; +use crate::cluster::StoredClusterManager; use crate::hummock::test_utils::*; use crate::hummock::HummockManager; -use crate::manager::MetaSrvEnv; +use crate::manager::{MetaSrvEnv, NotificationManager}; use crate::model::MetadataModel; +use crate::storage::MemStore; + +async fn setup_compute_env( + port: i32, +) -> ( + MetaSrvEnv, + Arc>, + Arc>, + WorkerNode, +) { + let env = MetaSrvEnv::for_test().await; + let hummock_manager = Arc::new(HummockManager::new(env.clone()).await.unwrap()); + let cluster_manager = Arc::new( + StoredClusterManager::new( + env.clone(), + Some(hummock_manager.clone()), + Arc::new(NotificationManager::new()), + ) + .await + .unwrap(), + ); + let fake_host_address = HostAddress { + host: "127.0.0.1".to_string(), + port, + }; + let (worker_node, _) = cluster_manager + .add_worker_node(fake_host_address, WorkerType::ComputeNode) + .await + .unwrap(); + (env, hummock_manager, cluster_manager, worker_node) +} #[tokio::test] async fn test_hummock_pin_unpin() -> Result<()> { - let env = MetaSrvEnv::for_test().await; - let hummock_manager = HummockManager::new(env.clone()).await?; - let context_id = 0; + let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let context_id = worker_node.id; let version_id = FIRST_VERSION_ID; let epoch = INVALID_EPOCH; @@ -77,9 +109,8 @@ async fn test_hummock_pin_unpin() -> Result<()> { #[tokio::test] async fn test_hummock_get_compact_task() -> Result<()> { - let env = MetaSrvEnv::for_test().await; - let hummock_manager = HummockManager::new(env.clone()).await?; - let context_id = 0; + let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let context_id = worker_node.id; let task = hummock_manager.get_compact_task().await?; assert_eq!(task, None); @@ -110,9 +141,8 @@ async fn test_hummock_get_compact_task() -> Result<()> { #[tokio::test] async fn test_hummock_table() -> Result<()> { - let env = MetaSrvEnv::for_test().await; - let hummock_manager = HummockManager::new(env.clone()).await?; - let context_id = 0; + let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let context_id = worker_node.id; let epoch: u64 = 1; let mut table_id = 1; @@ -154,9 +184,8 @@ async fn test_hummock_table() -> Result<()> { #[tokio::test] async fn test_hummock_transaction() -> Result<()> { - let env = MetaSrvEnv::for_test().await; - let hummock_manager = HummockManager::new(env.clone()).await?; - let context_id = 0; + let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let context_id = worker_node.id; let mut table_id = 1; let mut committed_tables = vec![]; @@ -348,10 +377,19 @@ async fn test_hummock_transaction() -> Result<()> { #[tokio::test] async fn test_release_context_resource() -> Result<()> { - let env = MetaSrvEnv::for_test().await; - let hummock_manager = Arc::new(HummockManager::new(env.clone()).await?); - let context_id_1 = 1; - let context_id_2 = 2; + let (env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(1).await; + let context_id_1 = worker_node.id; + + let fake_host_address_2 = HostAddress { + host: "127.0.0.1".to_string(), + port: 2, + }; + let (worker_node_2, _) = cluster_manager + .add_worker_node(fake_host_address_2, WorkerType::ComputeNode) + .await + .unwrap(); + let context_id_2 = worker_node_2.id; + assert_eq!( HummockContextPinnedVersion::list(&*env.meta_store_ref()) .await @@ -423,3 +461,41 @@ async fn test_release_context_resource() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn test_context_id_validation() { + let (_env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(80).await; + let invalid_context_id = HummockContextId::MAX; + let context_id = worker_node.id; + let epoch: u64 = 1; + let mut table_id = 1; + let original_tables = generate_test_tables(epoch, &mut table_id); + + // Invalid context id is rejected. + let error = hummock_manager + .add_tables(invalid_context_id, original_tables.clone(), epoch) + .await + .unwrap_err(); + assert!(matches!(error.inner(), ErrorCode::InternalError(_))); + assert_eq!(error.to_string(), "internal error: transaction aborted"); + + // Valid context id is accepted. + hummock_manager + .add_tables(context_id, original_tables.clone(), epoch) + .await + .unwrap(); + + hummock_manager.pin_version(context_id).await.unwrap(); + // Pin multiple times is OK. + hummock_manager.pin_version(context_id).await.unwrap(); + + // Remove the node from cluster will invalidate context id. + cluster_manager + .delete_worker_node(worker_node.host.unwrap()) + .await + .unwrap(); + // Invalid context id is rejected. + let error = hummock_manager.pin_version(context_id).await.unwrap_err(); + assert!(matches!(error.inner(), ErrorCode::InternalError(_))); + assert_eq!(error.to_string(), "internal error: transaction aborted"); +} diff --git a/rust/meta/src/model/cluster.rs b/rust/meta/src/model/cluster.rs index d48d5bf309535..8c5ed625fcbf0 100644 --- a/rust/meta/src/model/cluster.rs +++ b/rust/meta/src/model/cluster.rs @@ -38,4 +38,8 @@ impl Worker { pub fn worker_node(&self) -> WorkerNode { self.0.clone() } + + pub fn worker_id(&self) -> u32 { + self.0.id + } }