Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Support flight_connection_max_retry_times and flight_connection_retry_interval setting #16856

Merged
merged 7 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bincode = { workspace = true }
geozero = { workspace = true }
gimli = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
libc = { workspace = true }
object = { workspace = true }
once_cell = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ impl From<tonic::Status> for ErrorCode {
tonic::Code::Unknown => {
let details = status.details();
if details.is_empty() {
if status.source().map_or(false, |e| e.is::<hyper::Error>()) {
return ErrorCode::CannotConnectNode(format!(
"{}, source: {:?}",
status.message(),
status.source()
));
}
return ErrorCode::UnknownException(format!(
"{}, source: {:?}",
status.message(),
Expand Down
67 changes: 50 additions & 17 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ use futures::future::Either;
use futures::Future;
use futures::StreamExt;
use log::error;
use log::info;
use log::warn;
use parking_lot::RwLock;
use rand::thread_rng;
use rand::Rng;
use serde::Deserialize;
use serde::Serialize;
use tokio::time::sleep;

use crate::servers::flight::FlightClient;

Expand All @@ -81,11 +83,11 @@ pub trait ClusterHelper {

fn get_nodes(&self) -> Vec<Arc<NodeInfo>>;

async fn do_action<T: Serialize + Send, Res: for<'de> Deserialize<'de> + Send>(
async fn do_action<T: Serialize + Send + Clone, Res: for<'de> Deserialize<'de> + Send>(
&self,
path: &str,
message: HashMap<String, T>,
timeout: u64,
flight_params: FlightParams,
) -> Result<HashMap<String, Res>>;
}

Expand Down Expand Up @@ -118,11 +120,11 @@ impl ClusterHelper for Cluster {
self.nodes.to_vec()
}

async fn do_action<T: Serialize + Send, Res: for<'de> Deserialize<'de> + Send>(
async fn do_action<T: Serialize + Send + Clone, Res: for<'de> Deserialize<'de> + Send>(
&self,
path: &str,
message: HashMap<String, T>,
timeout: u64,
flight_params: FlightParams,
) -> Result<HashMap<String, Res>> {
fn get_node<'a>(nodes: &'a [Arc<NodeInfo>], id: &str) -> Result<&'a Arc<NodeInfo>> {
for node in nodes {
Expand All @@ -137,23 +139,47 @@ impl ClusterHelper for Cluster {
)))
}

let mut response = HashMap::with_capacity(message.len());
let mut futures = Vec::with_capacity(message.len());
for (id, message) in message {
let node = get_node(&self.nodes, &id)?;

let config = GlobalConfig::instance();
let flight_address = node.flight_address.clone();
let node_secret = node.secret.clone();

let mut conn = create_client(&config, &flight_address).await?;
response.insert(
id,
conn.do_action::<_, Res>(path, node_secret, message, timeout)
.await?,
);
futures.push({
let config = GlobalConfig::instance();
let flight_address = node.flight_address.clone();
let node_secret = node.secret.clone();

async move {
let mut attempt = 0;

loop {
let mut conn = create_client(&config, &flight_address).await?;
match conn
.do_action::<_, Res>(
path,
node_secret.clone(),
message.clone(),
flight_params.timeout,
)
.await
{
Ok(result) => return Ok((id, result)),
Err(e)
if e.code() == ErrorCode::CANNOT_CONNECT_NODE
&& attempt < flight_params.retry_times =>
{
// only retry when error is network problem
info!("retry do_action, attempt: {}", attempt);
attempt += 1;
sleep(Duration::from_secs(flight_params.retry_interval)).await;
}
Err(e) => return Err(e),
}
}
}
});
}

Ok(response)
let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?;
Ok(responses.into_iter().collect::<HashMap<String, Res>>())
}
}

Expand Down Expand Up @@ -537,3 +563,10 @@ pub async fn create_client(config: &InnerConfig, address: &str) -> Result<Flight
ConnectionFactory::create_rpc_channel(address.to_owned(), timeout, rpc_tls_config).await?,
)))
}

#[derive(Clone, Copy, Debug)]
pub struct FlightParams {
pub(crate) timeout: u64,
pub(crate) retry_times: u64,
pub(crate) retry_interval: u64,
}
1 change: 1 addition & 0 deletions src/query/service/src/clusters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ mod cluster;
pub use cluster::Cluster;
pub use cluster::ClusterDiscovery;
pub use cluster::ClusterHelper;
pub use cluster::FlightParams;
9 changes: 7 additions & 2 deletions src/query/service/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_sql::plans::KillPlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::KILL_QUERY;
Expand Down Expand Up @@ -54,7 +55,11 @@ impl KillInterpreter {
async fn kill_cluster_query(&self) -> Result<PipelineBuildResult> {
let cluster = self.ctx.get_cluster();
let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};

let mut message = HashMap::with_capacity(cluster.nodes.len());

Expand All @@ -65,7 +70,7 @@ impl KillInterpreter {
}

let res = cluster
.do_action::<_, bool>(KILL_QUERY, message, timeout)
.do_action::<_, bool>(KILL_QUERY, message, flight_params)
.await?;

match res.values().any(|x| *x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_sql::plans::SetPriorityPlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::SET_PRIORITY;
Expand Down Expand Up @@ -61,9 +62,13 @@ impl SetPriorityInterpreter {
}

let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
let res = cluster
.do_action::<_, bool>(SET_PRIORITY, message, timeout)
.do_action::<_, bool>(SET_PRIORITY, message, flight_params)
.await?;

match res.values().any(|x| *x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_sql::plans::SystemAction;
use databend_common_sql::plans::SystemPlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::SYSTEM_ACTION;
Expand Down Expand Up @@ -74,9 +75,13 @@ impl Interpreter for SystemActionInterpreter {
}

let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
cluster
.do_action::<_, ()>(SYSTEM_ACTION, message, timeout)
.do_action::<_, ()>(SYSTEM_ACTION, message, flight_params)
.await?;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_sql::plans::TruncateTablePlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::TRUNCATE_TABLE;
Expand Down Expand Up @@ -95,9 +96,13 @@ impl Interpreter for TruncateTableInterpreter {
}

let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
cluster
.do_action::<_, ()>(TRUNCATE_TABLE, message, timeout)
.do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params)
.await?;
}

Expand Down
8 changes: 7 additions & 1 deletion src/query/service/src/servers/admin/v1/query_profiling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use poem::IntoResponse;

use crate::clusters::ClusterDiscovery;
use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::servers::flight::v1::actions::GET_PROFILE;
use crate::sessions::SessionManager;

Expand Down Expand Up @@ -104,8 +105,13 @@ async fn get_cluster_profile(query_id: &str) -> Result<Vec<PlanProfile>, ErrorCo
}
}

let flight_params = FlightParams {
timeout: 60,
retry_times: 3,
retry_interval: 3,
};
let res = cluster
.do_action::<_, Option<Vec<PlanProfile>>>(GET_PROFILE, message, 60)
.do_action::<_, Option<Vec<PlanProfile>>>(GET_PROFILE, message, flight_params)
.await?;

match res.into_values().find(Option::is_some) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use super::exchange_transform::ExchangeTransform;
use super::statistics_receiver::StatisticsReceiver;
use super::statistics_sender::StatisticsSender;
use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -416,13 +417,17 @@ impl DataExchangeManager {
actions: QueryFragmentsActions,
) -> Result<PipelineBuildResult> {
let settings = ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
let root_actions = actions.get_root_actions()?;
let conf = GlobalConfig::instance();

// Initialize query env between cluster nodes
let query_env = actions.get_query_env()?;
query_env.init(&ctx, timeout).await?;
query_env.init(&ctx, flight_params).await?;

// Submit distributed tasks to all nodes.
let cluster = ctx.get_cluster();
Expand All @@ -431,7 +436,7 @@ impl DataExchangeManager {
let local_fragments = query_fragments.remove(&conf.query.node_id);

let _: HashMap<String, ()> = cluster
.do_action(INIT_QUERY_FRAGMENTS, query_fragments, timeout)
.do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params)
.await?;

self.set_ctx(&ctx.get_id(), ctx.clone())?;
Expand All @@ -444,7 +449,7 @@ impl DataExchangeManager {

let prepared_query = actions.prepared_query()?;
let _: HashMap<String, ()> = cluster
.do_action(START_PREPARED_QUERY, prepared_query, timeout)
.do_action(START_PREPARED_QUERY, prepared_query, flight_params)
.await?;

Ok(build_res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::servers::flight::v1::packets::QueryFragment;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QueryFragments {
pub query_id: String,
pub fragments: Vec<QueryFragment>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde::Deserialize;
use serde::Serialize;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::servers::flight::v1::actions::INIT_QUERY_ENV;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
Expand Down Expand Up @@ -140,7 +141,7 @@ pub struct QueryEnv {
}

impl QueryEnv {
pub async fn init(&self, ctx: &Arc<QueryContext>, timeout: u64) -> Result<()> {
pub async fn init(&self, ctx: &Arc<QueryContext>, flight_params: FlightParams) -> Result<()> {
debug!("Dataflow diagram {:?}", self.dataflow_diagram);

let cluster = ctx.get_cluster();
Expand All @@ -151,7 +152,7 @@ impl QueryEnv {
}

let _ = cluster
.do_action::<_, ()>(INIT_QUERY_ENV, message, timeout)
.do_action::<_, ()>(INIT_QUERY_ENV, message, flight_params)
.await?;

Ok(())
Expand Down
12 changes: 12 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,18 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("flight_connection_max_retry_times", DefaultSettingValue {
value: UserSettingValue::UInt64(3),
desc: "The maximum retry count for cluster flight. Disable if 0.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=10)),
}),
("flight_connection_retry_interval", DefaultSettingValue {
value: UserSettingValue::UInt64(3),
desc: "The retry interval of cluster flight is in seconds.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=30)),
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,4 +825,12 @@ impl Settings {
pub fn get_persist_materialized_cte(&self) -> Result<bool> {
Ok(self.try_get_u64("persist_materialized_cte")? != 0)
}

pub fn get_flight_max_retry_times(&self) -> Result<u64> {
self.try_get_u64("flight_connection_max_retry_times")
}

pub fn get_flight_retry_interval(&self) -> Result<u64> {
self.try_get_u64("flight_connection_retry_interval")
}
}
Loading