Skip to content

Commit

Permalink
refactor(sink): rename sink properties (risingwavelabs#4465)
Browse files Browse the repository at this point in the history
* rename sink properties

Signed-off-by: tabVersion <[email protected]>

* fix

Signed-off-by: tabVersion <[email protected]>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Aug 5, 2022
1 parent a12a9a8 commit 051cabf
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 20 deletions.
2 changes: 1 addition & 1 deletion e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( sink_type = 'kafka' )
explain create sink sink_t from t with ( connector = 'kafka' )

statement ok
drop table t;
2 changes: 1 addition & 1 deletion e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( sink_type = 'kafka' );
explain create sink sink_t from ddl_t with ( connector = 'kafka' );

# Create a mview with duplicated name.
statement error
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic_test.slt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ statement ok
select * from s6;

statement ok
create sink si from s5 with ( kafka.brokers = '127.0.0.1:29092', kafka.topic = 'sink_target', sink.type = 'append_only', sink_type = 'kafka' )
create sink si from s5 with ( kafka.brokers = '127.0.0.1:29092', kafka.topic = 'sink_target', format = 'append_only', connector = 'kafka' )

statement ok
create materialized source s7 (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'sink_target', kafka.brokers = '127.0.0.1:29092', kafka.scan.startup.mode='earliest' ) row format json
Expand Down
22 changes: 10 additions & 12 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ pub struct KafkaConfig {
// partition number. The partition number should set by meta.
pub partition: Option<i32>,

#[serde(rename = "sink.type")]
pub sink_type: String, // accept "append_only" or "debezium"
pub format: String, // accept "append_only" or "debezium"

pub identifier: String,

Expand All @@ -67,10 +66,10 @@ impl KafkaConfig {
let identifier = values
.get("identifier")
.expect("kafka.identifier must be set");
let sink_type = values.get("sink.type").expect("sink.type must be set");
if sink_type != "append_only" && sink_type != "debezium" {
let format = values.get("format").expect("format must be set");
if format != "append_only" && format != "debezium" {
return Err(SinkError::Config(
"sink.type must be set to \"append_only\" or \"debezium\"".to_string(),
"format must be set to \"append_only\" or \"debezium\"".to_string(),
));
}

Expand All @@ -84,7 +83,7 @@ impl KafkaConfig {
timeout: Duration::from_secs(5), // default timeout is 5 seconds
max_retry_num: 3, // default max retry num is 3
retry_interval: Duration::from_millis(100), // default retry interval is 100ms
sink_type: sink_type.to_string(),
format: format.to_string(),
})
}
}
Expand Down Expand Up @@ -198,12 +197,11 @@ impl Sink for KafkaSink {
if let (KafkaSinkState::Running(epoch), in_txn_epoch) = (&self.state, &self.in_transaction_epoch.unwrap()) && in_txn_epoch <= epoch {
return Ok(())
}
if self.config.sink_type.as_str() == "append_only" {
self.append_only(chunk, schema).await
} else if self.config.sink_type.as_str() == "debezium" {
todo!()
} else {
unreachable!()

match self.config.format.as_str() {
"append_only" => self.append_only(chunk, schema).await,
"debezium" => todo!(),
_ => unreachable!(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum SinkState {

impl SinkConfig {
pub fn from_hashmap(properties: HashMap<String, String>) -> RwResult<Self> {
const SINK_TYPE_KEY: &str = "sink_type";
const SINK_TYPE_KEY: &str = "connector";
let sink_type = properties.get(SINK_TYPE_KEY).ok_or_else(|| {
RwError::from(ErrorCode::InvalidConfigValue {
config_entry: SINK_TYPE_KEY.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub mod tests {
frontend.run_sql(sql).await.unwrap();

let sql = r#"CREATE SINK snk1 FROM mv1
WITH (sink = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table =
WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table =
'<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
mysql.password = '<password>');"#.to_string();
frontend.run_sql(sql).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod tests {
async fn test_drop_sink_handler() {
let sql_create_table = "create table t (v1 smallint);";
let sql_create_mv = "create materialized view mv as select v1 from t;";
let sql_create_sink = "create sink snk from mv with(sink = 'mysql')";
let sql_create_sink = "create sink snk from mv with( connector = 'mysql')";
let sql_drop_sink = "drop sink snk;";
let frontend = LocalFrontend::new(Default::default()).await;
frontend.run_sql(sql_create_table).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/sqlparser/tests/testdata/create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@
error_msg: |
sql parser error: Expected FROM, found: EOF
- input: CREATE SINK IF NOT EXISTS snk FROM mv WITH (sink = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>', mysql.password = '<password>')
formatted_sql: CREATE SINK IF NOT EXISTS snk FROM mv WITH (sink = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>', mysql.password = '<password>')
- input: CREATE SINK IF NOT EXISTS snk FROM mv WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>', mysql.password = '<password>')
formatted_sql: CREATE SINK IF NOT EXISTS snk FROM mv WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>', mysql.password = '<password>')

0 comments on commit 051cabf

Please sign in to comment.