Skip to content

Commit

Permalink
feat: Add to_local method to generate local execution lan (#2335)
Browse files Browse the repository at this point in the history
* feat: Add to_local method to generate local execution
  • Loading branch information
liurenjie1024 authored May 6, 2022
1 parent 1f401f5 commit 5f3c6d4
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 3 deletions.
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::plan_common::TableRefId;
use super::{
LogicalDelete, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

/// `BatchDelete` implements [`LogicalDelete`]
Expand Down Expand Up @@ -79,3 +80,10 @@ impl ToBatchProst for BatchDelete {
})
}
}

impl ToLocalBatch for BatchDelete {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local();
self.clone_with_input(new_input).into()
}
}
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

/// `BatchExchange` imposes a particular distribution on its input
Expand Down Expand Up @@ -84,3 +85,9 @@ impl ToBatchProst for BatchExchange {
}
}
}

impl ToLocalBatch for BatchExchange {
fn to_local(&self) -> PlanRef {
unreachable!()
}
}
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use risingwave_pb::batch_plan::FilterNode;

use super::{LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
use crate::optimizer::property::Order;
use crate::utils::Condition;

/// `BatchFilter` implements [`super::LogicalFilter`]
Expand Down Expand Up @@ -81,3 +82,10 @@ impl ToBatchProst for BatchFilter {
})
}
}

impl ToLocalBatch for BatchFilter {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local_with_order_required(Order::any());
self.clone_with_input(new_input).into()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::batch_plan::HashAggNode;
use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::expr::InputRefDisplay;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -110,3 +111,10 @@ impl ToBatchProst for BatchHashAgg {
})
}
}

impl ToLocalBatch for BatchHashAgg {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local_with_order_required(Order::any());
self.clone_with_input(new_input).into()
}
}
10 changes: 10 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{
EqJoinPredicate, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchProst,
ToDistributedBatch,
};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};
use crate::utils::ColIndexMapping;

Expand Down Expand Up @@ -144,3 +145,12 @@ impl ToBatchProst for BatchHashJoin {
})
}
}

impl ToLocalBatch for BatchHashJoin {
fn to_local(&self) -> PlanRef {
let left = self.left().to_local();
let right = self.right().to_local();

self.clone_with_left_right(left, right).into()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::batch_plan::HopWindowNode;
use super::{
LogicalHopWindow, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

/// `BatchHopWindow` implements [`super::LogicalHopWindow`] to evaluate specified expressions on
Expand Down Expand Up @@ -100,3 +101,10 @@ impl ToBatchProst for BatchHopWindow {
})
}
}

impl ToLocalBatch for BatchHopWindow {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local();
self.clone_with_input(new_input).into()
}
}
9 changes: 8 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::batch_plan::InsertNode;
use risingwave_pb::plan_common::TableRefId;

use super::{LogicalInsert, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::PlanBase;
use crate::optimizer::plan_node::{PlanBase, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order};

/// `BatchInsert` implements [`LogicalInsert`]
Expand Down Expand Up @@ -81,3 +81,10 @@ impl ToBatchProst for BatchInsert {
})
}
}

impl ToLocalBatch for BatchInsert {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local();
self.clone_with_input(new_input).into()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LimitNode;

use super::{LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;

/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -75,3 +76,10 @@ impl ToBatchProst for BatchLimit {
})
}
}

impl ToLocalBatch for BatchLimit {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local();
self.clone_with_input(new_input).into()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::{
LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::expr::Expr;
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

/// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
Expand Down Expand Up @@ -108,3 +109,10 @@ impl ToBatchProst for BatchProject {
NodeBody::Project(ProjectNode { select_list })
}
}

impl ToLocalBatch for BatchProject {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local_with_order_required(Order::any());
self.clone_with_input(new_input).into()
}
}
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::batch_plan::RowSeqScanNode;
use risingwave_pb::plan_common::{CellBasedTableDesc, ColumnDesc as ProstColumnDesc};

use super::{PlanBase, PlanRef, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::LogicalScan;
use crate::optimizer::plan_node::{LogicalScan, ToLocalBatch};
use crate::optimizer::property::{Distribution, Order};

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
Expand Down Expand Up @@ -90,3 +90,9 @@ impl ToBatchProst for BatchSeqScan {
})
}
}

impl ToLocalBatch for BatchSeqScan {
fn to_local(&self) -> PlanRef {
todo!()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_pb::batch_plan::SortAggNode;

use super::logical_agg::PlanAggCall;
use super::{LogicalAgg, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -87,3 +88,10 @@ impl ToBatchProst for BatchSimpleAgg {
})
}
}

impl ToLocalBatch for BatchSimpleAgg {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local_with_order_required(Order::any());
self.clone_with_input(new_input).into()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::OrderByNode;

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::Order;

/// `BatchSort` buffers all data from input and sort these rows by specified order, providing the
Expand Down Expand Up @@ -68,3 +69,10 @@ impl ToBatchProst for BatchSort {
NodeBody::OrderBy(OrderByNode { column_orders })
}
}

impl ToLocalBatch for BatchSort {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local();
self.clone_with_input(new_input).into()
}
}
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::TopNNode;

use super::{LogicalTopN, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

/// `BatchTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
Expand Down Expand Up @@ -84,3 +85,10 @@ impl ToBatchProst for BatchTopN {
})
}
}

impl ToLocalBatch for BatchTopN {
fn to_local(&self) -> PlanRef {
let new_input = self.input().to_local();
self.clone_with_input(new_input).into()
}
}
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::batch_plan::ValuesNode;

use super::{LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchProst, ToDistributedBatch};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -141,3 +142,9 @@ mod tests {
);
}
}

impl ToLocalBatch for BatchValues {
fn to_local(&self) -> PlanRef {
Self::with_dist(self.logical().clone(), Distribution::Single).into()
}
}
29 changes: 29 additions & 0 deletions src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ pub trait ToBatch {
}
}

/// Converts a batch physical plan to local plan for local execution.
///
/// This is quite similar to `ToBatch`, but different in several ways. For example it converts
/// scan to exchange + scan.
pub trait ToLocalBatch {
fn to_local(&self) -> PlanRef;

/// Convert the plan to batch local physical plan and satisfy the required Order
fn to_local_with_order_required(&self, required_order: &Order) -> PlanRef {
let ret = self.to_local();
required_order.enforce_if_not_satisfies(ret)
}
}

/// `ToDistributedBatch` allows to convert a batch physical plan to distributed batch plan, by
/// insert exchange node, with an optional required order and distributed.
///
Expand Down Expand Up @@ -141,3 +155,18 @@ macro_rules! ban_to_distributed {
}
for_logical_plan_nodes! { ban_to_distributed }
for_stream_plan_nodes! { ban_to_distributed }

/// impl `ToLocalBatch` for logical and streaming node.
macro_rules! ban_to_local {
([], $( { $convention:ident, $name:ident }),*) => {
paste!{
$(impl ToLocalBatch for [<$convention $name>] {
fn to_local(&self) -> PlanRef {
panic!("converting to distributed is only allowed on batch plan")
}
})*
}
}
}
for_logical_plan_nodes! { ban_to_local }
for_stream_plan_nodes! { ban_to_local }
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub trait PlanNode:
+ ToStream
+ ToDistributedBatch
+ ToProst
+ ToLocalBatch
{
fn node_type(&self) -> PlanNodeType;
fn plan_base(&self) -> &PlanBase;
Expand Down

0 comments on commit 5f3c6d4

Please sign in to comment.