Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): get pg_field_descs from boudstatement #950

Merged
merged 7 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use risingwave_sqlparser::ast::{Expr, ObjectName};
use super::{BaseTableRef, Binder};
use crate::expr::ExprImpl;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundDelete {
pub table: BaseTableRef,
pub selection: Option<ExprImpl>,
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use risingwave_sqlparser::ast::{Ident, ObjectName, Query};

use crate::binder::{BaseTableRef, Binder, BoundQuery};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundInsert {
pub table: BaseTableRef,
pub source: BoundQuery,
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::optimizer::property::{Direction, FieldOrder};

/// A validated sql query, including order and union.
/// An example of its relationship with BoundSetExpr and BoundSelect can be found here: https://bit.ly/3GQwgPz
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundQuery {
pub body: BoundSetExpr,
pub order: Vec<FieldOrder>,
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::bind_context::{Clause, ColumnBinding};
use crate::binder::{Binder, TableRef};
use crate::expr::{Expr as _, ExprImpl, InputRef};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundSelect {
pub distinct: bool,
pub select_items: Vec<ExprImpl>,
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::binder::{Binder, BoundSelect, BoundValues};

/// Part of a validated query, without order or limit clause. It may be composed of smaller
/// BoundSetExprs via set operators (e.g. union).
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum BoundSetExpr {
Select(Box<BoundSelect>),
Values(Box<BoundValues>),
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use risingwave_sqlparser::ast::Statement;
use super::delete::BoundDelete;
use crate::binder::{Binder, BoundInsert, BoundQuery};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum BoundStatement {
Insert(Box<BoundInsert>),
Delete(Box<BoundDelete>),
Expand Down
4 changes: 2 additions & 2 deletions rust/frontend/src/binder/table_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::TableId;
use crate::expr::{Expr, ExprImpl};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum TableRef {
BaseTable(Box<BaseTableRef>),
Join(Box<BoundJoin>),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundJoin {
pub left: TableRef,
pub right: TableRef,
Expand Down
2 changes: 1 addition & 1 deletion rust/frontend/src/binder/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::bind_context::Clause;
use crate::binder::Binder;
use crate::expr::{Expr as _, ExprImpl, ExprType, FunctionCall};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BoundValues {
pub rows: Vec<Vec<ExprImpl>>,
pub schema: Schema,
Expand Down
22 changes: 4 additions & 18 deletions rust/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::cell::RefCell;
use std::rc::Rc;

use itertools::Itertools;
use pgwire::pg_field_descriptor::{PgFieldDescriptor, TypeOid};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{ErrorCode, Result, RwError};
Expand All @@ -11,7 +9,7 @@ use risingwave_rpc_client::{ComputeClient, ExchangeSource, GrpcExchangeSource};
use risingwave_sqlparser::ast::{Query, Statement};

use crate::binder::Binder;
use crate::handler::util::to_pg_rows;
use crate::handler::util::{get_pg_field_descs, to_pg_rows};
use crate::planner::Planner;
use crate::scheduler::schedule::WorkerNodeManager;
use crate::session::QueryContext;
Expand All @@ -22,10 +20,10 @@ pub async fn handle_query(context: QueryContext, query: Box<Query>) -> Result<Pg
let catalog = catalog_mgr
.get_database_snapshot(session.database())
.ok_or_else(|| ErrorCode::InternalError(String::from("catalog not found")))?;
let mut binder = Binder::new(catalog);
let mut binder = Binder::new(catalog.clone());
let bound = binder.bind(Statement::Query(query))?;
let plan = Planner::new(Rc::new(RefCell::new(context)))
.plan(bound)?
.plan(bound.clone())?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to pass a ref to bound statement into the planner?

If we have an insert with thousands of values, I suspect this could be slow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think it's better to take &BoundStatement for pg_field_descs. The planner needs ownership to reduce clones.

.gen_batch_query_plan()
.to_batch_prost();

Expand Down Expand Up @@ -61,19 +59,7 @@ pub async fn handle_query(context: QueryContext, query: Box<Query>) -> Result<Pg
rows.append(&mut to_pg_rows(chunk));
}

let pg_len = {
if !rows.is_empty() {
rows.get(0).unwrap().values().len() as i32
} else {
0
}
};

// TODO: from bound extract column_name and data_type to build pg_desc
let pg_descs = (0..pg_len)
.into_iter()
.map(|_i| PgFieldDescriptor::new("item".to_string(), TypeOid::Varchar))
.collect_vec();
let pg_descs = get_pg_field_descs(bound)?;

Ok(PgResponse::new(
StatementType::SELECT,
Expand Down
107 changes: 106 additions & 1 deletion rust/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use itertools::Itertools;
use pgwire::pg_field_descriptor::{PgFieldDescriptor, TypeOid};
use pgwire::types::Row;
use risingwave_common::array::DataChunk;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;

use crate::binder::{BoundSetExpr, BoundStatement};
use crate::expr::Expr;

pub fn to_pg_rows(chunk: DataChunk) -> Vec<Row> {
chunk
.rows()
Expand All @@ -14,13 +21,111 @@ pub fn to_pg_rows(chunk: DataChunk) -> Vec<Row> {
.collect_vec()
}

pub fn get_pg_field_descs(bound: BoundStatement) -> Result<Vec<PgFieldDescriptor>> {
if let BoundStatement::Query(query) = bound {
if let BoundSetExpr::Select(select) = query.body {
let mut pg_descs = vec![];
for i in 0..select.select_items.len() {
pg_descs.push(PgFieldDescriptor::new(
select.aliases[i].as_ref().unwrap().to_string(),
data_type_to_type_oid(select.select_items[i].return_type()),
));
}
Ok(pg_descs)
} else {
Err(ErrorCode::NotImplementedError(
"get pg_field descs only support select bound_set_expr".to_string(),
)
.into())
}
} else {
Err(ErrorCode::NotImplementedError(
"get pg_field descs only support query bound_statement".to_string(),
)
.into())
}
}

pub fn data_type_to_type_oid(data_type: DataType) -> TypeOid {
match data_type {
DataType::Int16 => TypeOid::SmallInt,
DataType::Int32 => TypeOid::Int,
DataType::Int64 => TypeOid::BigInt,
DataType::Float32 => TypeOid::Float4,
DataType::Float64 => TypeOid::Float8,
DataType::Boolean => TypeOid::Boolean,
DataType::Char => TypeOid::CharArray,
DataType::Varchar => TypeOid::Varchar,
DataType::Date => TypeOid::Date,
DataType::Time => TypeOid::Time,
DataType::Timestamp => TypeOid::Timestamp,
DataType::Timestampz => TypeOid::Timestampz,
DataType::Decimal => TypeOid::Decimal,
DataType::Interval => TypeOid::Varchar,
DataType::Struct { .. } => TypeOid::Varchar,
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
use pgwire::pg_field_descriptor::TypeOid;
use risingwave_common::array::*;
use risingwave_common::{column, column_nonnull};

use crate::handler::util::to_pg_rows;
use crate::binder::{BoundQuery, BoundSelect, BoundSetExpr, BoundStatement};
use crate::expr::ExprImpl;
use crate::handler::util::{get_pg_field_descs, to_pg_rows};

#[test]
fn test_get_pg_field_descs() {
let select = BoundSelect {
distinct: false,
select_items: vec![
ExprImpl::literal_int(1),
ExprImpl::literal_int(2),
ExprImpl::literal_bool(true),
],
aliases: vec![
Some("column1".to_string()),
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
Some("column2".to_string()),
Some("column3".to_string()),
],
from: None,
selection: None,
};
let bound = BoundStatement::Query(
BoundQuery {
body: BoundSetExpr::Select(select.into()),
order: vec![],
}
.into(),
);
let pg_descs = get_pg_field_descs(bound).unwrap();
assert_eq!(
pg_descs
.clone()
.into_iter()
.map(|p| { p.get_name().to_string() })
.collect_vec(),
[
"column1".to_string(),
"column2".to_string(),
"column3".to_string()
]
);
assert_eq!(
pg_descs
.into_iter()
.map(|p| { p.get_type_oid().as_number() })
.collect_vec(),
[
TypeOid::Int.as_number(),
TypeOid::Int.as_number(),
TypeOid::Boolean.as_number()
]
);
}

#[test]
fn test_to_pg_rows() {
Expand Down