Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reduce duplicated code on create source and create table #1665

Merged
merged 6 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/v2/batch_distributed/modify.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ select v1, v2 from t order by v2;
statement ok
delete from t where v1 = 1919;

query RI
query RI rowsort
select v1, v2 from t order by v2;
----
114 10
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ColumnDesc {
}
}

// Get all column descs under field_descs
/// Get all column descs under `field_descs`.
pub fn get_column_descs(&self) -> Vec<ColumnDesc> {
let mut descs = vec![self.clone()];
for desc in &self.field_descs {
Expand Down
10 changes: 2 additions & 8 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_sqlparser::ast::{Expr, Select, SelectItem};
use super::bind_context::{Clause, ColumnBinding};
use super::UNNAMED_COLUMN;
use crate::binder::{Binder, Relation};
use crate::catalog::{is_row_id_column_name, ROWID_PREFIX};
use crate::catalog::check_valid_column_name;
use crate::expr::{Expr as _, ExprImpl, InputRef};

#[derive(Debug)]
Expand Down Expand Up @@ -127,13 +127,7 @@ impl Binder {
aliases.push(alias);
}
SelectItem::ExprWithAlias { expr, alias } => {
if is_row_id_column_name(&alias.value) {
return Err(ErrorCode::InternalError(format!(
"column name prefixed with {:?} are reserved word.",
ROWID_PREFIX
))
.into());
}
check_valid_column_name(&alias.value)?;

let expr = self.bind_expr(expr)?;
select_list.push(expr);
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/catalog/column_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::types::DataType;
use risingwave_pb::plan::ColumnCatalog as ProstColumnCatalog;

use super::row_id_column_desc;

#[derive(Debug, Clone, PartialEq)]
pub struct ColumnCatalog {
pub column_desc: ColumnDesc,
Expand Down Expand Up @@ -52,6 +54,14 @@ impl ColumnCatalog {
is_hidden: self.is_hidden,
}
}

/// Creates a row ID column (for implicit primary key).
pub fn row_id_column() -> Self {
Self {
column_desc: row_id_column_desc(),
is_hidden: true,
}
}
}

impl From<ProstColumnCatalog> for ColumnCatalog {
Expand Down
32 changes: 30 additions & 2 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use thiserror::Error;
pub(crate) mod catalog_service;

Expand All @@ -31,7 +33,20 @@ pub(crate) type SchemaId = u32;
pub(crate) type TableId = risingwave_common::catalog::TableId;
pub(crate) type ColumnId = risingwave_common::catalog::ColumnId;

pub const ROWID_PREFIX: &str = "_row_id";
/// Check if the column name does not conflict with the internally reserved column name.
pub fn check_valid_column_name(column_name: &str) -> Result<()> {
if is_row_id_column_name(column_name) {
Err(ErrorCode::InternalError(format!(
"column name prefixed with {:?} are reserved word.",
ROWID_PREFIX
))
.into())
} else {
Ok(())
}
}

const ROWID_PREFIX: &str = "_row_id";

pub fn gen_row_id_column_name(idx: usize) -> String {
ROWID_PREFIX.to_string() + "#" + &idx.to_string()
Expand All @@ -41,6 +56,19 @@ pub fn is_row_id_column_name(name: &str) -> bool {
name.starts_with(ROWID_PREFIX)
}

pub const TABLE_SOURCE_PK_COLID: ColumnId = ColumnId::new(0);

/// Creates a row ID column (for implicit primary key).
pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Int64,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
field_descs: vec![],
type_name: "".to_string(),
}
}

#[derive(Error, Debug)]
pub enum CatalogError {
#[error("{0} not found: {1}")]
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use risingwave_pb::catalog::Source as ProstSource;
use risingwave_pb::stream_plan::source_node::SourceType;

use super::column_catalog::ColumnCatalog;
use super::{ColumnId, SourceId};
use crate::handler::create_table::TABLE_SOURCE_PK_COLID;
use super::{ColumnId, SourceId, TABLE_SOURCE_PK_COLID};
/// this struct `SourceCatalog` is used in frontend and compared with `ProstSource` it only maintain
/// information which will be used during optimization.
#[derive(Clone, Debug)]
Expand Down
29 changes: 4 additions & 25 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {
use risingwave_pb::plan::{ColumnCatalog as ProstColumnCatalog, ColumnDesc as ProstColumnDesc};

use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::gen_row_id_column_name;
use crate::catalog::row_id_column_desc;
use crate::catalog::table_catalog::TableCatalog;

#[test]
Expand All @@ -171,13 +171,7 @@ mod tests {
name: "test".to_string(),
columns: vec![
ProstColumnCatalog {
column_desc: Some(ProstColumnDesc {
column_id: 0,
name: gen_row_id_column_name(0),
field_descs: vec![],
column_type: Some(DataType::Int32.to_protobuf()),
type_name: String::new(),
}),
column_desc: Some((&row_id_column_desc()).into()),
is_hidden: true,
},
ProstColumnCatalog {
Expand Down Expand Up @@ -216,16 +210,7 @@ mod tests {
associated_source_id: Some(TableId::new(233)),
name: "test".to_string(),
columns: vec![
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Int32,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
field_descs: vec![],
type_name: String::new()
},
is_hidden: true,
},
ColumnCatalog::row_id_column(),
ColumnCatalog {
column_desc: ColumnDesc {
data_type: DataType::Struct {
Expand Down Expand Up @@ -255,13 +240,7 @@ mod tests {
}
],
pk_desc: vec![OrderedColumnDesc {
column_desc: ColumnDesc {
data_type: DataType::Int32,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
field_descs: vec![],
type_name: String::new()
},
column_desc: row_id_column_desc(),
order: OrderType::Ascending
}]
}
Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::PgResponse;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::Result;
use risingwave_pb::catalog::Table as ProstTable;
use risingwave_sqlparser::ast::{ObjectName, Query};
Expand Down Expand Up @@ -72,10 +72,7 @@ pub async fn handle_create_mv(
.create_materialized_view(table, stream_plan)
.await?;

Ok(PgResponse::new(
pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW,
0,
vec![],
vec![],
Ok(PgResponse::empty_result(
StatementType::CREATE_MATERIALIZED_VIEW,
))
}
Loading