Skip to content

Commit

Permalink
fix(frontend): only schedule dml to cn which contain table source wri…
Browse files Browse the repository at this point in the history
…ter (#4329)

* fix: only schedule dml to cn contains table source writer

* fix duplicate bind

* choose

* fix

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
yezizp2012 and mergify[bot] authored Aug 1, 2022
1 parent 6bf43fe commit 650221e
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 10 deletions.
13 changes: 12 additions & 1 deletion src/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use itertools::Itertools;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, ParallelUnitId};
use risingwave_sqlparser::ast::{Ident, ObjectName, Query, SetExpr};

use super::{BoundQuery, BoundSetExpr};
Expand All @@ -26,6 +26,9 @@ pub struct BoundInsert {
/// Used for injecting deletion chunks to the source.
pub table_source: BoundTableSource,

/// Used for scheduling.
pub vnode_mapping: Option<Vec<ParallelUnitId>>,

pub source: BoundQuery,

/// Used as part of an extra `Project` when the column types of `source` query does not match
Expand All @@ -40,6 +43,7 @@ impl Binder {
_columns: Vec<Ident>,
source: Query,
) -> Result<BoundInsert> {
let (schema_name, table_name) = Self::resolve_table_name(source_name.clone())?;
let table_source = self.bind_table_source(source_name)?;

let expected_types = table_source
Expand All @@ -48,6 +52,12 @@ impl Binder {
.map(|c| c.data_type.clone())
.collect();

let vnode_mapping = self
.catalog
.get_table_by_name(&self.db_name, &schema_name, &table_name)?
.vnode_mapping
.clone();

// When the column types of `source` query does not match `expected_types`, casting is
// needed.
//
Expand Down Expand Up @@ -112,6 +122,7 @@ impl Binder {

let insert = BoundInsert {
table_source,
vnode_mapping,
source,
cast_exprs,
};
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::hash_map::Entry;
use std::collections::HashMap;

use itertools::Itertools;
use risingwave_common::ensure;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::ParallelUnitId;
use risingwave_sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins};

use super::{Binder, BoundTableSource, Relation};
Expand All @@ -29,6 +29,9 @@ pub struct BoundUpdate {
/// Used for injecting new chunks to the source.
pub table_source: BoundTableSource,

/// Used for scheduling.
pub vnode_mapping: Option<Vec<ParallelUnitId>>,

/// Used for scanning the records to update with the `selection`.
pub table: Relation,

Expand Down Expand Up @@ -63,7 +66,11 @@ impl Binder {
}

let table = self.bind_vec_table_with_joins(vec![table])?.unwrap();
assert_matches!(table, Relation::BaseTable(_));
let vnode_mapping = if let Relation::BaseTable(base_table) = &table {
base_table.table_catalog.vnode_mapping.clone()
} else {
unreachable!()
};

let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?;

Expand Down Expand Up @@ -123,6 +130,7 @@ impl Binder {

Ok(BoundUpdate {
table_source,
vnode_mapping,
table,
selection,
exprs,
Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/handler/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::Result;
use risingwave_sqlparser::ast::Statement;

use crate::binder::Binder;
use crate::binder::{Binder, BoundStatement};
use crate::handler::privilege::{check_privileges, resolve_privileges};
use crate::handler::util::{to_pg_field, to_pg_rows};
use crate::planner::Planner;
Expand All @@ -39,6 +39,13 @@ pub async fn handle_dml(context: OptimizerContext, stmt: Statement) -> Result<Pg
let check_items = resolve_privileges(&bound);
check_privileges(&session, &check_items)?;

let vnodes = match &bound {
BoundStatement::Insert(insert) => insert.vnode_mapping.clone(),
BoundStatement::Update(update) => update.vnode_mapping.clone(),
BoundStatement::Delete(delete) => delete.table.table_catalog.vnode_mapping.clone(),
BoundStatement::Query(_) => unreachable!(),
};

let (plan, pg_descs) = {
// Subblock to make sure PlanRef (an Rc) is dropped before `await` below.
let root = Planner::new(context.into()).plan(bound)?;
Expand All @@ -54,7 +61,7 @@ pub async fn handle_dml(context: OptimizerContext, stmt: Statement) -> Result<Pg
let mut rows = vec![];
#[for_await]
for chunk in query_manager
.schedule_single(execution_context, plan)
.schedule_single(execution_context, plan, vnodes)
.await?
{
rows.extend(to_pg_rows(chunk?, false));
Expand Down
19 changes: 18 additions & 1 deletion src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::fmt::{Debug, Formatter};
use futures::StreamExt;
use futures_async_stream::try_stream;
use log::debug;
use rand::seq::SliceRandom;
use risingwave_common::array::DataChunk;
use risingwave_common::error::RwError;
use risingwave_common::types::ParallelUnitId;
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::{
ExchangeInfo, PlanFragment, PlanNode as BatchPlanProst, TaskId, TaskOutputId,
Expand Down Expand Up @@ -72,8 +74,23 @@ impl QueryManager {
&self,
_context: ExecutionContextRef,
plan: BatchPlanProst,
vnodes: Option<Vec<ParallelUnitId>>,
) -> SchedulerResult<impl DataChunkStream> {
let worker_node_addr = self.worker_node_manager.next_random()?.host.unwrap();
let worker_node_addr = match vnodes {
Some(mut parallel_unit_ids) => {
parallel_unit_ids.dedup();
let candidates = self
.worker_node_manager
.get_workers_by_parallel_unit_ids(&parallel_unit_ids)?;
candidates
.choose(&mut rand::thread_rng())
.unwrap()
.clone()
.host
.unwrap()
}
None => self.worker_node_manager.next_random()?.host.unwrap(),
};

let compute_client = self
.compute_client_pool
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::{Arc, RwLock};

use rand::distributions::{Distribution as RandDistribution, Uniform};
use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
Expand Down Expand Up @@ -67,14 +67,15 @@ impl WorkerNodeManager {
/// Get a random worker node.
pub fn next_random(&self) -> SchedulerResult<WorkerNode> {
let current_nodes = self.worker_nodes.read().unwrap();
let mut rng = rand::thread_rng();
if current_nodes.is_empty() {
tracing::error!("No worker node available.");
bail!("No worker node available");
}

let die = Uniform::from(0..current_nodes.len());
Ok(current_nodes.get(die.sample(&mut rng)).unwrap().clone())
Ok(current_nodes
.choose(&mut rand::thread_rng())
.unwrap()
.clone())
}

pub fn worker_node_count(&self) -> usize {
Expand Down

0 comments on commit 650221e

Please sign in to comment.