Skip to content

Commit

Permalink
refactor: reduce duplicated code on create source and create table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
neverchanje authored Apr 7, 2022
1 parent 87b8ffc commit 81ce409
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 206 deletions.
2 changes: 1 addition & 1 deletion e2e_test/v2/batch_distributed/modify.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ select v1, v2 from t order by v2;
statement ok
delete from t where v1 = 1919;

query RI
query RI rowsort
select v1, v2 from t order by v2;
----
114 10
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ColumnDesc {
}
}

// Get all column descs under field_descs
/// Get all column descs under `field_descs`.
pub fn get_column_descs(&self) -> Vec<ColumnDesc> {
let mut descs = vec![self.clone()];
for desc in &self.field_descs {
Expand Down
10 changes: 2 additions & 8 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_sqlparser::ast::{Expr, Select, SelectItem};
use super::bind_context::{Clause, ColumnBinding};
use super::UNNAMED_COLUMN;
use crate::binder::{Binder, Relation};
use crate::catalog::{is_row_id_column_name, ROWID_PREFIX};
use crate::catalog::check_valid_column_name;
use crate::expr::{Expr as _, ExprImpl, InputRef};

#[derive(Debug)]
Expand Down Expand Up @@ -127,13 +127,7 @@ impl Binder {
aliases.push(alias);
}
SelectItem::ExprWithAlias { expr, alias } => {
if is_row_id_column_name(&alias.value) {
return Err(ErrorCode::InternalError(format!(
"column name prefixed with {:?} are reserved word.",
ROWID_PREFIX
))
.into());
}
check_valid_column_name(&alias.value)?;

let expr = self.bind_expr(expr)?;
select_list.push(expr);
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/catalog/column_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::types::DataType;
use risingwave_pb::plan::ColumnCatalog as ProstColumnCatalog;

use super::row_id_column_desc;

#[derive(Debug, Clone, PartialEq)]
pub struct ColumnCatalog {
pub column_desc: ColumnDesc,
Expand Down Expand Up @@ -52,6 +54,14 @@ impl ColumnCatalog {
is_hidden: self.is_hidden,
}
}

/// Creates a row ID column (for implicit primary key).
pub fn row_id_column() -> Self {
Self {
column_desc: row_id_column_desc(),
is_hidden: true,
}
}
}

impl From<ProstColumnCatalog> for ColumnCatalog {
Expand Down
32 changes: 30 additions & 2 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use thiserror::Error;
pub(crate) mod catalog_service;

Expand All @@ -31,7 +33,20 @@ pub(crate) type SchemaId = u32;
pub(crate) type TableId = risingwave_common::catalog::TableId;
pub(crate) type ColumnId = risingwave_common::catalog::ColumnId;

pub const ROWID_PREFIX: &str = "_row_id";
/// Check if the column name does not conflict with the internally reserved column name.
pub fn check_valid_column_name(column_name: &str) -> Result<()> {
if is_row_id_column_name(column_name) {
Err(ErrorCode::InternalError(format!(
"column name prefixed with {:?} are reserved word.",
ROWID_PREFIX
))
.into())
} else {
Ok(())
}
}

const ROWID_PREFIX: &str = "_row_id";

pub fn gen_row_id_column_name(idx: usize) -> String {
ROWID_PREFIX.to_string() + "#" + &idx.to_string()
Expand All @@ -41,6 +56,19 @@ pub fn is_row_id_column_name(name: &str) -> bool {
name.starts_with(ROWID_PREFIX)
}

pub const TABLE_SOURCE_PK_COLID: ColumnId = ColumnId::new(0);

/// Creates a row ID column (for implicit primary key).
pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Int64,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
field_descs: vec![],
type_name: "".to_string(),
}
}

#[derive(Error, Debug)]
pub enum CatalogError {
#[error("{0} not found: {1}")]
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use risingwave_pb::catalog::Source as ProstSource;
use risingwave_pb::stream_plan::source_node::SourceType;

use super::column_catalog::ColumnCatalog;
use super::{ColumnId, SourceId};
use crate::handler::create_table::TABLE_SOURCE_PK_COLID;
use super::{ColumnId, SourceId, TABLE_SOURCE_PK_COLID};
/// this struct `SourceCatalog` is used in frontend and compared with `ProstSource` it only maintain
/// information which will be used during optimization.
#[derive(Clone, Debug)]
Expand Down
29 changes: 4 additions & 25 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {
use risingwave_pb::plan::{ColumnCatalog as ProstColumnCatalog, ColumnDesc as ProstColumnDesc};

use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::gen_row_id_column_name;
use crate::catalog::row_id_column_desc;
use crate::catalog::table_catalog::TableCatalog;

#[test]
Expand All @@ -171,13 +171,7 @@ mod tests {
name: "test".to_string(),
columns: vec![
ProstColumnCatalog {
column_desc: Some(ProstColumnDesc {
column_id: 0,
name: gen_row_id_column_name(0),
field_descs: vec![],
column_type: Some(DataType::Int32.to_protobuf()),
type_name: String::new(),
}),
column_desc: Some((&row_id_column_desc()).into()),
is_hidden: true,
},
ProstColumnCatalog {
Expand Down Expand Up @@ -216,16 +210,7 @@ mod tests {
associated_source_id: Some(TableId::new(233)),
name: "test".to_string(),
columns: vec![
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Int32,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
field_descs: vec![],
type_name: String::new()
},
is_hidden: true,
},
ColumnCatalog::row_id_column(),
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Struct {
Expand Down Expand Up @@ -255,13 +240,7 @@ mod tests {
}
],
pk_desc: vec![OrderedColumnDesc {
column_desc: ColumnDesc {
data_type: DataType::Int32,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
field_descs: vec![],
type_name: String::new()
},
column_desc: row_id_column_desc(),
order: OrderType::Ascending
}]
}
Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::PgResponse;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::Result;
use risingwave_pb::catalog::Table as ProstTable;
use risingwave_sqlparser::ast::{ObjectName, Query};
Expand Down Expand Up @@ -72,10 +72,7 @@ pub async fn handle_create_mv(
.create_materialized_view(table, stream_plan)
.await?;

Ok(PgResponse::new(
pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW,
0,
vec![],
vec![],
Ok(PgResponse::empty_result(
StatementType::CREATE_MATERIALIZED_VIEW,
))
}
140 changes: 60 additions & 80 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,48 @@ use std::collections::HashMap;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::DEFAULT_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_pb::catalog::source::Info;
use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo};
use risingwave_pb::plan::{ColumnCatalog, ColumnDesc, RowFormatType};
use risingwave_pb::plan::{ColumnCatalog as ProstColumnCatalog, RowFormatType};
use risingwave_source::ProtobufParser;
use risingwave_sqlparser::ast::{CreateSourceStatement, ProtobufSchema, SourceSchema};
use risingwave_sqlparser::ast::{CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema};

use crate::binder::expr::bind_data_type;
use crate::catalog::gen_row_id_column_name;
use crate::session::OptimizerContext;
use super::create_table::{bind_sql_columns, gen_materialized_source_plan};
use crate::binder::Binder;
use crate::catalog::column_catalog::ColumnCatalog;
use crate::session::{OptimizerContext, SessionImpl};

fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result<Vec<ColumnCatalog>> {
pub(crate) fn make_prost_source(
session: &SessionImpl,
name: ObjectName,
source_info: Info,
) -> Result<ProstSource> {
let (schema_name, name) = Binder::resolve_table_name(name)?;

let (database_id, schema_id) = session
.env()
.catalog_reader()
.read_guard()
.check_relation_name_duplicated(session.database(), &schema_name, &name)?;

Ok(ProstSource {
id: 0,
schema_id,
database_id,
name,
info: Some(source_info),
})
}

/// Map a protobuf schema to a relational schema.
fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result<Vec<ProstColumnCatalog>> {
let parser = ProtobufParser::new(&schema.row_schema_location.0, &schema.message_name.0)?;
let column_descs = parser.map_to_columns()?;

Ok(column_descs
.into_iter()
.map(|col| ColumnCatalog {
.map(|col| ProstColumnCatalog {
column_desc: Some(col),
is_hidden: false,
})
Expand All @@ -44,90 +66,48 @@ fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result<Vec<ColumnCa

pub(super) async fn handle_create_source(
context: OptimizerContext,
is_materialized: bool,
stmt: CreateSourceStatement,
) -> Result<PgResponse> {
let session = context.session_ctx;

let schema_name = DEFAULT_SCHEMA_NAME;
let source_name = stmt.source_name.value.clone();

// pre add row_id catalog
let mut column_catalogs = vec![ColumnCatalog {
column_desc: Some(ColumnDesc {
column_id: 0,
name: gen_row_id_column_name(0),
column_type: Some(DataType::Int32.to_protobuf()),
field_descs: vec![],
type_name: "".to_string(),
}),
is_hidden: true,
}];

let (database_id, schema_id) = session
.env()
.catalog_reader()
.read_guard()
.check_relation_name_duplicated(session.database(), schema_name, &source_name)?;

let source = match &stmt.source_schema {
SourceSchema::Protobuf(protobuf_schema) => {
column_catalogs.extend(extract_protobuf_table_schema(protobuf_schema)?.into_iter());
let mut columns = vec![ColumnCatalog::row_id_column().to_protobuf()];
columns.extend(extract_protobuf_table_schema(protobuf_schema)?.into_iter());
StreamSourceInfo {
properties: HashMap::from(stmt.with_properties),
row_format: RowFormatType::Protobuf as i32,
row_schema_location: protobuf_schema.row_schema_location.0.clone(),
row_id_index: 0,
columns: column_catalogs,
pk_column_ids: vec![0],
}
}
SourceSchema::Json => {
column_catalogs.append(
&mut stmt
.columns
.into_iter()
.enumerate()
.map(|(idx, col)| {
Ok(ColumnCatalog {
column_desc: Some(ColumnDesc {
column_id: (idx + 1) as i32,
name: col.name.to_string(),
column_type: Some(bind_data_type(&col.data_type)?.to_protobuf()),
field_descs: vec![],
type_name: "".to_string(),
}),
is_hidden: false,
})
})
.collect::<Result<Vec<ColumnCatalog>>>()?,
);
StreamSourceInfo {
properties: HashMap::from(stmt.with_properties),
row_format: RowFormatType::Json as i32,
row_schema_location: "".to_string(),
row_id_index: 0,
columns: column_catalogs,
columns,
pk_column_ids: vec![0],
}
}
SourceSchema::Json => StreamSourceInfo {
properties: HashMap::from(stmt.with_properties),
row_format: RowFormatType::Json as i32,
row_schema_location: "".to_string(),
row_id_index: 0,
columns: bind_sql_columns(stmt.columns)?,
pk_column_ids: vec![0],
},
};

let session = context.session_ctx.clone();
let source = make_prost_source(&session, stmt.source_name, Info::StreamSource(source))?;
let catalog_writer = session.env().catalog_writer();
catalog_writer
.create_source(ProstSource {
id: 0,
schema_id,
database_id,
name: source_name.clone(),
info: Some(Info::StreamSource(source)),
})
.await?;

Ok(PgResponse::new(
StatementType::CREATE_SOURCE,
0,
vec![],
vec![],
))
if is_materialized {
let (plan, table) = {
let (plan, table) = gen_materialized_source_plan(context.into(), source.clone())?;
let plan = plan.to_stream_prost();
(plan, table)
};
catalog_writer
.create_materialized_source(source, table, plan)
.await?;
} else {
catalog_writer.create_source(source).await?;
}
Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
}

#[cfg(test)]
Expand Down Expand Up @@ -215,7 +195,7 @@ pub mod tests {
};
let row_id_col_name = gen_row_id_column_name(0);
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Int32,
row_id_col_name.as_str() => DataType::Int64,
"id" => DataType::Int32,
"country.zipcode" => DataType::Varchar,
"zipcode" => DataType::Int64,
Expand Down
Loading

0 comments on commit 81ce409

Please sign in to comment.