From c93a4f4b0d8857ec874d816a434c72bcf50fc743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 1 Nov 2022 13:16:33 +0800 Subject: [PATCH] refactor(meta): return InvalidReply if fail to decode a message During upgrading, different version of databend-query/meta run in a same cluster. If a message that can not be decoded received, return an `InvalidReply` error instead of just panicking the program. Returning such an error might make the cluster unable to operate while upgrading, but data durability and consistency won't be compromised. --- src/meta/service/src/meta_service/raftmeta.rs | 35 ++++------ .../tests/it/meta_node/meta_node_seq_api.rs | 4 +- src/meta/types/src/errors/kv_app_errors.rs | 8 +++ src/meta/types/src/errors/meta_api_errors.rs | 8 +++ src/meta/types/src/errors/meta_errors.rs | 8 +++ src/meta/types/src/message.rs | 68 +++++++++++++++++-- 6 files changed, 102 insertions(+), 29 deletions(-) diff --git a/src/meta/service/src/meta_service/raftmeta.rs b/src/meta/service/src/meta_service/raftmeta.rs index 7e09d19299945..b5b9d0a89f290 100644 --- a/src/meta/service/src/meta_service/raftmeta.rs +++ b/src/meta/service/src/meta_service/raftmeta.rs @@ -640,30 +640,19 @@ impl MetaNode { Ok(r) => { let reply = r.into_inner(); - if reply.error.is_empty() { - // No error. It does not have to decode an error from an old databend-meta - - let res: Result = reply.into(); - match res { - Ok(v) => { - info!("join cluster via {} success: {:?}", addr, v); - return Ok(Ok(())); - } - Err(e) => { - error!("join cluster via {} fail: {}", addr, e.to_string()); - errors.push( - AnyError::new(&e) - .add_context(|| format!("join via: {}", addr.clone())), - ); - } + let res: Result = reply.into(); + match res { + Ok(v) => { + info!("join cluster via {} success: {:?}", addr, v); + return Ok(Ok(())); + } + Err(e) => { + error!("join cluster via {} fail: {}", addr, e.to_string()); + errors.push( + AnyError::new(&e) + .add_context(|| format!("join via: {}", addr.clone())), + ); } - } else { - // TODO: workaround: error type changed. new version of databend-meta does not understand old databend-meta error. - error!("join cluster via {} fail: {}", addr, &reply.error); - errors.push( - AnyError::error(&reply.error) - .add_context(|| format!("join via: {}", addr.clone())), - ); } } Err(s) => { diff --git a/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs b/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs index aa31e437b801c..207c36e4a62c6 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_seq_api.rs @@ -20,7 +20,7 @@ use common_meta_types::protobuf::raft_service_client::RaftServiceClient; use common_meta_types::AppliedState; use common_meta_types::Cmd; use common_meta_types::LogEntry; -use common_meta_types::RetryableError; +use common_meta_types::MetaError; use databend_meta::init_meta_ut; use databend_meta::meta_service::MetaNode; @@ -46,7 +46,7 @@ async fn test_meta_node_incr_seq() -> anyhow::Result<()> { }; let raft_reply = client.write(req).await?.into_inner(); - let res: Result = raft_reply.into(); + let res: Result = raft_reply.into(); let resp: AppliedState = res?; match resp { AppliedState::Seq { seq } => { diff --git a/src/meta/types/src/errors/kv_app_errors.rs b/src/meta/types/src/errors/kv_app_errors.rs index 09c6bfe10a428..f35eabbf76af0 100644 --- a/src/meta/types/src/errors/kv_app_errors.rs +++ b/src/meta/types/src/errors/kv_app_errors.rs @@ -17,6 +17,7 @@ use common_meta_stoerr::MetaStorageError; use tonic::Status; use crate::AppError; +use crate::InvalidReply; use crate::MetaAPIError; use crate::MetaClientError; use crate::MetaError; @@ -90,3 +91,10 @@ impl From for KVAppError { Self::MetaError(meta_err) } } + +impl From for KVAppError { + fn from(e: InvalidReply) -> Self { + let meta_err = MetaError::from(e); + Self::MetaError(meta_err) + } +} diff --git a/src/meta/types/src/errors/meta_api_errors.rs b/src/meta/types/src/errors/meta_api_errors.rs index 012ad1bfbe090..0e5c6b075b997 100644 --- a/src/meta/types/src/errors/meta_api_errors.rs +++ b/src/meta/types/src/errors/meta_api_errors.rs @@ -20,6 +20,7 @@ use openraft::error::ChangeMembershipError; use openraft::error::Fatal; use openraft::error::ForwardToLeader; +use crate::InvalidReply; use crate::MetaNetworkError; /// Errors raised when meta-service handling a request. @@ -125,3 +126,10 @@ impl From for MetaOperationError { MetaOperationError::from(de) } } + +impl From for MetaAPIError { + fn from(e: InvalidReply) -> Self { + let net_err = MetaNetworkError::from(e); + Self::NetworkError(net_err) + } +} diff --git a/src/meta/types/src/errors/meta_errors.rs b/src/meta/types/src/errors/meta_errors.rs index e22d42c2643fb..a28bc17834834 100644 --- a/src/meta/types/src/errors/meta_errors.rs +++ b/src/meta/types/src/errors/meta_errors.rs @@ -18,6 +18,7 @@ use serde::Deserialize; use serde::Serialize; use thiserror::Error; +use crate::InvalidReply; use crate::MetaAPIError; use crate::MetaClientError; use crate::MetaNetworkError; @@ -58,3 +59,10 @@ impl From for MetaError { MetaError::NetworkError(net_err) } } + +impl From for MetaError { + fn from(e: InvalidReply) -> Self { + let api_err = MetaAPIError::from(e); + Self::APIError(api_err) + } +} diff --git a/src/meta/types/src/message.rs b/src/meta/types/src/message.rs index b207305c9cad0..030080eb12a83 100644 --- a/src/meta/types/src/message.rs +++ b/src/meta/types/src/message.rs @@ -26,6 +26,7 @@ use crate::AppliedState; use crate::Endpoint; use crate::GetKVReply; use crate::GetKVReq; +use crate::InvalidReply; use crate::ListKVReply; use crate::ListKVReq; use crate::LogEntry; @@ -215,14 +216,16 @@ impl From for RaftReply { impl From for Result where T: DeserializeOwned, - E: DeserializeOwned, + E: DeserializeOwned + From, { fn from(msg: RaftReply) -> Self { if !msg.data.is_empty() { - let resp: T = serde_json::from_str(&msg.data).expect("fail to deserialize"); - Ok(resp) + let res: T = serde_json::from_str(&msg.data) + .map_err(|e| InvalidReply::new("can not decode RaftReply.data", &e))?; + Ok(res) } else { - let err: E = serde_json::from_str(&msg.error).expect("fail to deserialize"); + let err: E = serde_json::from_str(&msg.error) + .map_err(|e| InvalidReply::new("can not decode RaftReply.error", &e))?; Err(err) } } @@ -267,3 +270,60 @@ where E: DeserializeOwned } } } + +#[cfg(test)] +mod tests { + + #[derive(serde::Serialize, serde::Deserialize)] + struct Foo { + i: i32, + } + + use crate::protobuf::RaftReply; + use crate::MetaNetworkError; + + #[test] + fn test_valid_reply() -> anyhow::Result<()> { + // Unable to decode `.data` + + let msg = RaftReply { + data: "foo".to_string(), + error: "".to_string(), + }; + let res: Result = msg.into(); + match res { + Err(MetaNetworkError::InvalidReply(inv_reply)) => { + assert!( + inv_reply + .to_string() + .starts_with("InvalidReply: can not decode RaftReply.data") + ); + } + _ => { + unreachable!("expect InvalidReply") + } + } + + // Unable to decode `.error` + + let msg = RaftReply { + data: "".to_string(), + error: "foo".to_string(), + }; + let res: Result = msg.into(); + match res { + Err(MetaNetworkError::InvalidReply(inv_reply)) => { + assert!( + inv_reply + .to_string() + .starts_with("InvalidReply: can not decode RaftReply.error") + ); + } + _ => { + unreachable!("expect InvalidReply") + } + } + + Ok(()) + } +}