Skip to content

Commit

Permalink
Merge pull request #8717 from xudong963/remove_11_09
Browse files Browse the repository at this point in the history
refactor: remove useless code in cluster
  • Loading branch information
mergify[bot] authored Nov 9, 2022
2 parents 199a370 + 63f085b commit 8866cce
Show file tree
Hide file tree
Showing 7 changed files with 3 additions and 143 deletions.
1 change: 0 additions & 1 deletion src/query/service/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub use rpc::MergeExchange;
pub use rpc::PrecommitBlock;
pub use rpc::QueryFragmentsPlanPacket;
pub use rpc::ServerFlightExchange;
pub use rpc::ShuffleDataExchange;
pub use rpc::ShuffleDataExchangeV2;
pub use rpc_service::RpcService;

Expand Down
21 changes: 0 additions & 21 deletions src/query/service/src/api/rpc/exchange/data_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use common_sql::executor::PhysicalScalar;
pub enum DataExchange {
Merge(MergeExchange),
Broadcast(BroadcastExchange),
ShuffleDataExchange(ShuffleDataExchange),
ShuffleDataExchangeV2(ShuffleDataExchangeV2),
}

Expand All @@ -27,39 +26,19 @@ impl DataExchange {
match self {
DataExchange::Merge(exchange) => vec![exchange.destination_id.clone()],
DataExchange::Broadcast(exchange) => exchange.destination_ids.clone(),
DataExchange::ShuffleDataExchange(exchange) => exchange.destination_ids.clone(),
DataExchange::ShuffleDataExchangeV2(exchange) => exchange.destination_ids.clone(),
}
}

pub fn from_multiple_nodes(&self) -> bool {
match self {
DataExchange::Merge(_) => true,
DataExchange::ShuffleDataExchange(_) => true,
DataExchange::ShuffleDataExchangeV2(_) => true,
DataExchange::Broadcast(exchange) => exchange.from_multiple_nodes,
}
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ShuffleDataExchange {
pub destination_ids: Vec<String>,
pub exchange_expression: PhysicalScalar,
}

impl ShuffleDataExchange {
pub fn create(
destination_ids: Vec<String>,
exchange_expression: PhysicalScalar,
) -> DataExchange {
DataExchange::ShuffleDataExchange(ShuffleDataExchange {
destination_ids,
exchange_expression,
})
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ShuffleDataExchangeV2 {
pub destination_ids: Vec<String>,
Expand Down
26 changes: 3 additions & 23 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::api::rpc::exchange::statistics_receiver::StatisticsReceiver;
use crate::api::rpc::exchange::statistics_sender::StatisticsSender;
use crate::api::rpc::flight_client::FlightExchange;
use crate::api::rpc::flight_scatter_broadcast::BroadcastFlightScatter;
use crate::api::rpc::flight_scatter_hash::HashFlightScatter;
use crate::api::rpc::flight_scatter_hash_v2::HashFlightScatterV2;
use crate::api::rpc::Packet;
use crate::api::DataExchange;
Expand Down Expand Up @@ -493,7 +492,7 @@ impl QueryCoordinator {
return Ok(fragment_coordinator.pipeline_build_res.unwrap());
}

let exchange_params = fragment_coordinator.create_exchange_params(ctx, info)?;
let exchange_params = fragment_coordinator.create_exchange_params(info)?;
let mut build_res = fragment_coordinator.pipeline_build_res.unwrap();

let data_exchange = fragment_coordinator.data_exchange.as_ref().unwrap();
Expand Down Expand Up @@ -538,7 +537,7 @@ impl QueryCoordinator {

let mut params = Vec::with_capacity(self.fragments_coordinator.len());
for coordinator in self.fragments_coordinator.values() {
params.push(coordinator.create_exchange_params(&info.query_ctx, info)?);
params.push(coordinator.create_exchange_params(info)?);
}

for ((_, coordinator), params) in self.fragments_coordinator.iter_mut().zip(params) {
Expand Down Expand Up @@ -614,11 +613,7 @@ impl FragmentCoordinator {
})
}

pub fn create_exchange_params(
&self,
ctx: &Arc<QueryContext>,
info: &QueryInfo,
) -> Result<ExchangeParams> {
pub fn create_exchange_params(&self, info: &QueryInfo) -> Result<ExchangeParams> {
match &self.data_exchange {
None => Err(ErrorCode::Internal("Cannot find data exchange.")),
Some(DataExchange::Merge(exchange)) => {
Expand All @@ -641,21 +636,6 @@ impl FragmentCoordinator {
)?)),
}))
}
Some(DataExchange::ShuffleDataExchange(exchange)) => {
Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
schema: self.payload.schema()?,
fragment_id: self.fragment_id,
query_id: info.query_id.to_string(),
executor_id: info.current_executor.to_string(),
destination_ids: exchange.destination_ids.to_owned(),
shuffle_scatter: Arc::new(Box::new(HashFlightScatter::try_create(
ctx.clone(),
self.payload.schema()?,
Some(exchange.exchange_expression.clone()),
exchange.destination_ids.len(),
)?)),
}))
}
Some(DataExchange::ShuffleDataExchangeV2(exchange)) => {
Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
schema: self.payload.schema()?,
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/api/rpc/exchange/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ mod statistics_sender;
pub use data_exchange::BroadcastExchange;
pub use data_exchange::DataExchange;
pub use data_exchange::MergeExchange;
pub use data_exchange::ShuffleDataExchange;
pub use data_exchange::ShuffleDataExchangeV2;
pub use exchange_manager::DataExchangeManager;
94 changes: 0 additions & 94 deletions src/query/service/src/api/rpc/flight_scatter_hash.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/query/service/src/api/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod flight_actions;
mod flight_client;
mod flight_scatter;
mod flight_scatter_broadcast;
mod flight_scatter_hash;
mod flight_scatter_hash_v2;
mod flight_service;
mod packets;
Expand All @@ -31,7 +30,6 @@ pub use exchange::BroadcastExchange;
pub use exchange::DataExchange;
pub use exchange::DataExchangeManager;
pub use exchange::MergeExchange;
pub use exchange::ShuffleDataExchange;
pub use exchange::ShuffleDataExchangeV2;
pub use flight_client::ClientFlightExchange;
pub use flight_client::ServerFlightExchange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl<'a> Display for QueryFragmentActionsWrap<'a> {
match data_exchange {
DataExchange::Merge(_) => writeln!(f, " DataExchange: Merge")?,
DataExchange::Broadcast(_) => writeln!(f, " DataExchange: Broadcast")?,
DataExchange::ShuffleDataExchange(_) => writeln!(f, " DataExchange: Shuffle")?,
DataExchange::ShuffleDataExchangeV2(_) => writeln!(f, " DataExchange: Shuffle")?,
}
}
Expand Down

1 comment on commit 8866cce

@vercel
Copy link

@vercel vercel bot commented on 8866cce Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.