Skip to content

Commit

Permalink
feat(frontend): support explain verbose (risingwavelabs#3798)
Browse files Browse the repository at this point in the history
* feat(frontend) support explain verbose

* prefix each field name with table_name

* do-apply-planner-test

* minor improvement

* fix batch two phase agg which should satisfy all agg-call contain no distinct and order by

* fix stream two phase agg which should satisfy all agg-call contain no distinct and order by

* move explain_verbose flag from session config to optimizer context

* fmt

* fix test case
  • Loading branch information
chenzl25 authored Jul 13, 2022
1 parent 4a5503e commit edc6660
Show file tree
Hide file tree
Showing 57 changed files with 1,223 additions and 314 deletions.
2 changes: 1 addition & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
pub use column::*;
pub use internal_table::*;
pub use physical_table::*;
pub use schema::{test_utils as schema_test_utils, Field, Schema};
pub use schema::{test_utils as schema_test_utils, Field, FieldVerboseDisplay, Schema};

use crate::array::Row;
pub use crate::config::constant::hummock;
Expand Down
23 changes: 23 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ impl Field {
}
}

pub struct FieldVerboseDisplay<'a>(pub &'a Field);

impl std::fmt::Debug for FieldVerboseDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.name)
}
}

impl std::fmt::Display for FieldVerboseDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.name)
}
}

/// `schema_unnamed` builds a `Schema` with the given types, but without names.
#[macro_export]
macro_rules! schema_unnamed {
Expand Down Expand Up @@ -176,6 +190,15 @@ impl Field {
pub fn data_type(&self) -> DataType {
self.data_type.clone()
}

pub fn from_with_table_name_prefix(desc: &ColumnDesc, table_name: &str) -> Self {
Self {
data_type: desc.data_type.clone(),
name: format!("{}.{}", table_name, desc.name),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}

impl From<&ProstField> for Field {
Expand Down
105 changes: 104 additions & 1 deletion src/frontend/src/expr/function_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

use itertools::Itertools;
use num_integer::Integer as _;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;

use super::{align_types, cast_ok, infer_type, CastContext, Expr, ExprImpl, Literal};
use crate::expr::ExprType;
use crate::expr::{ExprType, ExprVerboseDisplay};

#[derive(Clone, Eq, PartialEq, Hash)]
pub struct FunctionCall {
Expand Down Expand Up @@ -259,3 +260,105 @@ impl Expr for FunctionCall {
}
}
}

pub struct FunctionCallVerboseDisplay<'a> {
pub function_call: &'a FunctionCall,
pub input_schema: &'a Schema,
}

impl std::fmt::Debug for FunctionCallVerboseDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let that = self.function_call;
match &that.func_type {
ExprType::Cast => {
assert_eq!(that.inputs.len(), 1);
ExprVerboseDisplay {
expr: &that.inputs[0],
input_schema: self.input_schema,
}
.fmt(f)?;
write!(f, "::{:?}", that.return_type)
}
ExprType::Add => explain_verbose_binary_op(f, "+", &that.inputs, self.input_schema),
ExprType::Subtract => {
explain_verbose_binary_op(f, "-", &that.inputs, self.input_schema)
}
ExprType::Multiply => {
explain_verbose_binary_op(f, "*", &that.inputs, self.input_schema)
}
ExprType::Divide => explain_verbose_binary_op(f, "/", &that.inputs, self.input_schema),
ExprType::Modulus => explain_verbose_binary_op(f, "%", &that.inputs, self.input_schema),
ExprType::Equal => explain_verbose_binary_op(f, "=", &that.inputs, self.input_schema),
ExprType::NotEqual => {
explain_verbose_binary_op(f, "<>", &that.inputs, self.input_schema)
}
ExprType::LessThan => {
explain_verbose_binary_op(f, "<", &that.inputs, self.input_schema)
}
ExprType::LessThanOrEqual => {
explain_verbose_binary_op(f, "<=", &that.inputs, self.input_schema)
}
ExprType::GreaterThan => {
explain_verbose_binary_op(f, ">", &that.inputs, self.input_schema)
}
ExprType::GreaterThanOrEqual => {
explain_verbose_binary_op(f, ">=", &that.inputs, self.input_schema)
}
ExprType::And => explain_verbose_binary_op(f, "AND", &that.inputs, self.input_schema),
ExprType::Or => explain_verbose_binary_op(f, "OR", &that.inputs, self.input_schema),
ExprType::BitwiseShiftLeft => {
explain_verbose_binary_op(f, "<<", &that.inputs, self.input_schema)
}
ExprType::BitwiseShiftRight => {
explain_verbose_binary_op(f, ">>", &that.inputs, self.input_schema)
}
ExprType::BitwiseAnd => {
explain_verbose_binary_op(f, "&", &that.inputs, self.input_schema)
}
ExprType::BitwiseOr => {
explain_verbose_binary_op(f, "|", &that.inputs, self.input_schema)
}
ExprType::BitwiseXor => {
explain_verbose_binary_op(f, "#", &that.inputs, self.input_schema)
}
_ => {
let func_name = format!("{:?}", that.func_type);
let mut builder = f.debug_tuple(&func_name);
that.inputs.iter().for_each(|child| {
builder.field(&ExprVerboseDisplay {
expr: child,
input_schema: self.input_schema,
});
});
builder.finish()
}
}
}
}

fn explain_verbose_binary_op(
f: &mut std::fmt::Formatter<'_>,
op: &str,
inputs: &[ExprImpl],
input_schema: &Schema,
) -> std::fmt::Result {
use std::fmt::Debug;

assert_eq!(inputs.len(), 2);

write!(f, "(")?;
ExprVerboseDisplay {
expr: &inputs[0],
input_schema,
}
.fmt(f)?;
write!(f, " {} ", op)?;
ExprVerboseDisplay {
expr: &inputs[1],
input_schema,
}
.fmt(f)?;
write!(f, ")")?;

Ok(())
}
35 changes: 35 additions & 0 deletions src/frontend/src/expr/input_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use risingwave_pb::expr::agg_call::Arg as ProstAggCallArg;
use risingwave_pb::expr::InputRefExpr;
Expand Down Expand Up @@ -53,6 +54,40 @@ impl fmt::Debug for InputRefDisplay {
}
}

#[derive(Clone, Copy)]
pub struct InputRefVerboseDisplay<'a> {
pub input_ref: &'a InputRef,
pub input_schema: &'a Schema,
}

impl fmt::Display for InputRefVerboseDisplay<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
self.input_schema
.fields
.get(self.input_ref.index)
.unwrap()
.name
)
}
}

impl fmt::Debug for InputRefVerboseDisplay<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
self.input_schema
.fields
.get(self.input_ref.index)
.unwrap()
.name
)
}
}

impl fmt::Display for InputRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", InputRefDisplay(self.index))
Expand Down
41 changes: 39 additions & 2 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ mod utils;

pub use agg_call::{AggCall, AggOrderBy, AggOrderByExpr};
pub use correlated_input_ref::CorrelatedInputRef;
pub use function_call::FunctionCall;
pub use input_ref::{as_alias_display, input_ref_to_column_indices, InputRef, InputRefDisplay};
pub use function_call::{FunctionCall, FunctionCallVerboseDisplay};
pub use input_ref::{
as_alias_display, input_ref_to_column_indices, InputRef, InputRefDisplay,
InputRefVerboseDisplay,
};
pub use literal::Literal;
pub use subquery::{Subquery, SubqueryKind};

Expand Down Expand Up @@ -489,6 +492,39 @@ impl std::fmt::Debug for ExprImpl {
}
}

pub struct ExprVerboseDisplay<'a> {
pub expr: &'a ExprImpl,
pub input_schema: &'a Schema,
}

impl std::fmt::Debug for ExprVerboseDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let that = self.expr;
match that {
ExprImpl::InputRef(x) => write!(
f,
"{:?}",
InputRefVerboseDisplay {
input_ref: x,
input_schema: self.input_schema
}
),
ExprImpl::Literal(x) => write!(f, "{:?}", x),
ExprImpl::FunctionCall(x) => write!(
f,
"{:?}",
FunctionCallVerboseDisplay {
function_call: x,
input_schema: self.input_schema
}
),
ExprImpl::AggCall(x) => write!(f, "{:?}", x),
ExprImpl::Subquery(x) => write!(f, "{:?}", x),
ExprImpl::CorrelatedInputRef(x) => write!(f, "{:?}", x),
}
}
}

#[cfg(test)]
/// Asserts that the expression is an [`InputRef`] with the given index.
macro_rules! assert_eq_input_ref {
Expand All @@ -502,6 +538,7 @@ macro_rules! assert_eq_input_ref {

#[cfg(test)]
pub(crate) use assert_eq_input_ref;
use risingwave_common::catalog::Schema;

use crate::utils::Condition;

Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::Ordering;

use pgwire::pg_field_descriptor::{PgFieldDescriptor, TypeOid};
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Row;
Expand All @@ -29,12 +31,12 @@ use crate::session::OptimizerContext;
pub(super) fn handle_explain(
context: OptimizerContext,
stmt: Statement,
_verbose: bool,
verbose: bool,
) -> Result<PgResponse> {
let session = context.session_ctx.clone();
context.explain_verbose.store(verbose, Ordering::Release);
// bind, plan, optimize, and serialize here
let mut planner = Planner::new(context.into());

let plan = match stmt {
Statement::CreateView {
or_replace: false,
Expand Down
30 changes: 24 additions & 6 deletions src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};

use super::{PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order};
use crate::optimizer::property::{
Distribution, DistributionVerboseDisplay, Order, OrderVerboseDisplay,
};

/// `BatchExchange` imposes a particular distribution on its input
/// without changing its content.
Expand All @@ -42,11 +44,27 @@ impl BatchExchange {

impl fmt::Display for BatchExchange {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"BatchExchange {{ order: {}, dist: {:?} }}",
self.base.order, self.base.dist
)
let verbose = self.base.ctx.is_explain_verbose();
if verbose {
write!(
f,
"BatchExchange {{ order: {}, dist: {} }}",
OrderVerboseDisplay {
order: &self.base.order,
input_schema: self.input.schema()
},
DistributionVerboseDisplay {
distribution: &self.base.dist,
input_schema: self.input.schema()
}
)
} else {
write!(
f,
"BatchExchange {{ order: {}, dist: {:?} }}",
self.base.order, self.base.dist
)
}
}
}

Expand Down
24 changes: 19 additions & 5 deletions src/frontend/src/optimizer/plan_node/batch_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::FieldVerboseDisplay;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::expand_node::Subset;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -47,15 +48,28 @@ impl BatchExpand {
pub fn column_subsets(&self) -> &Vec<Vec<usize>> {
self.logical.column_subsets()
}

pub fn column_subsets_verbose_display(&self) -> Vec<Vec<FieldVerboseDisplay>> {
self.logical.column_subsets_verbose_display()
}
}

impl fmt::Display for BatchExpand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"BatchExpand {{ column_subsets: {:?} }}",
self.column_subsets()
)
let verbose = self.base.ctx.is_explain_verbose();
if verbose {
write!(
f,
"BatchExpand {{ column_subsets: {:?} }}",
self.column_subsets_verbose_display()
)
} else {
write!(
f,
"BatchExpand {{ column_subsets: {:?} }}",
self.column_subsets()
)
}
}
}

Expand Down
Loading

0 comments on commit edc6660

Please sign in to comment.