Skip to content

Commit

Permalink
feat(connector): SinkExecutor init + MySQLSink mvp (#2969)
Browse files Browse the repository at this point in the history
* SinkExecutor init

* MySQLSink init

* lock

* add very rudimental implementation of MySQLSink write_batch()

* cargo fmt

* Refactor MySQLSink::write_batch() and improve test

* Add MySQLType

* Add DELETE to write_batch() and improve test

* Refactor

* Improve test

* Integrate mysql_async::Error into RwError

* Implement Update operation in write_batch()

* Add closures

* Add UPDATE to basic test

* Fix ./risedev test

* Update src/frontend/test_runner/tests/gen/testcases.rs

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* apply clippy

* Improve test

* commit

* Implement mysql_async::Value

* Add scalars and tests

* Clean up

* Apply ./risedev check

* SinkExecutor init

* MySQLSink init

* lock

* add very rudimental implementation of MySQLSink write_batch()

* cargo fmt

* Refactor MySQLSink::write_batch() and improve test

* Add MySQLType

* Add DELETE to write_batch() and improve test

* Refactor

* Improve test

* Integrate mysql_async::Error into RwError

* Implement Update operation in write_batch()

* Add closures

* Add UPDATE to basic test

* Fix ./risedev test

* Update src/frontend/test_runner/tests/gen/testcases.rs

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* apply clippy

* Improve test

* commit

* Implement mysql_async::Value

* Add scalars and tests

* Clean up

* Apply ./risedev check

* reintroduce MySQLError

* Organize sink files

* Remove testcases.rs

* Add SinkConfig

* impl Sink for SinkImpl

* Introduce SinkError

* Merge main

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Tao Wu <[email protected]>
  • Loading branch information
3 people authored Jun 13, 2022
1 parent 1b749b3 commit b7537bb
Show file tree
Hide file tree
Showing 11 changed files with 854 additions and 9 deletions.
383 changes: 382 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ pub enum ErrorCode {
},
#[error("Invalid Parameter Value: {0}")]
InvalidParameterValue(String),
#[error("MySQL error: {0}")]
SinkError(BoxedError),

/// This error occurs when the meta node receives heartbeat from a previous removed worker
/// node. Currently we don't support re-register, and the worker node need a full restart.
Expand Down Expand Up @@ -334,6 +336,7 @@ impl ErrorCode {
ErrorCode::ExprError(_) => 28,
ErrorCode::ArrayError(_) => 29,
ErrorCode::SchedulerError(_) => 30,
ErrorCode::SinkError(_) => 31,
ErrorCode::UnknownError(_) => 101,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub use native_type::*;
use risingwave_pb::data::data_type::IntervalType::*;
use risingwave_pb::data::data_type::{IntervalType, TypeName};
pub use scalar_impl::*;
mod chrono_wrapper;
mod decimal;
pub mod chrono_wrapper;
pub mod decimal;
pub mod interval;

mod ordered_float;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ log = "0.4"
madsim = "=0.2.0-alpha.3"
maplit = "1.0.2"
memcomparable = { path = "../utils/memcomparable" }
mysql_async = "0.30"
num-traits = "0.2"
paste = "1"
prost = "0.10"
Expand Down Expand Up @@ -61,5 +62,6 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
rand = "0.8"
rust_decimal = "1"
tempfile = "3"
wiremock = "0.5"
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use base::*;
pub mod aws_utils;
pub mod dummy_connector;
mod macros;
pub mod sink;
pub mod state;

pub use base::ConnectorState;
Expand Down
73 changes: 73 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod mysql;
pub mod redis;

use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, RwError};
use thiserror::Error;

use crate::sink::mysql::{MySQLConfig, MySQLSink};
use crate::sink::redis::{RedisConfig, RedisSink};

#[async_trait]
pub trait Sink {
async fn write_batch(&mut self, chunk: StreamChunk, schema: &Schema) -> Result<()>;
}

pub enum SinkConfig {
Mysql(MySQLConfig),
Redis(RedisConfig),
}

pub enum SinkImpl {
MySQL(MySQLSink),
Redis(RedisSink),
}

impl SinkImpl {
fn new(cfg: SinkConfig) -> Self {
match cfg {
SinkConfig::Mysql(cfg) => SinkImpl::MySQL(MySQLSink::new(cfg)),
SinkConfig::Redis(cfg) => SinkImpl::Redis(RedisSink::new(cfg)),
}
}
}

#[async_trait]
impl Sink for SinkImpl {
async fn write_batch(&mut self, chunk: StreamChunk, schema: &Schema) -> Result<()> {
match self {
SinkImpl::MySQL(sink) => sink.write_batch(chunk, schema).await,
SinkImpl::Redis(sink) => sink.write_batch(chunk, schema).await,
}
}
}

pub type Result<T> = std::result::Result<T, SinkError>;

#[derive(Error, Debug)]
pub enum SinkError {
#[error(transparent)]
MySQL(#[from] mysql_async::Error),
}

impl From<SinkError> for RwError {
fn from(e: SinkError) -> Self {
ErrorCode::SinkError(Box::new(e)).into()
}
}
248 changes: 248 additions & 0 deletions src/connector/src/sink/mysql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use async_trait::async_trait;
use itertools::{join, Itertools};
use mysql_async::prelude::*;
use mysql_async::*;
use risingwave_common::array::Op::*;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::types::{Datum, Decimal, ScalarImpl};

use crate::sink::{Result, Sink, SinkError};

pub struct MySQLConfig {
pub endpoint: String,
pub table: String,
pub database: Option<String>,
pub user: Option<String>,
pub password: Option<String>,
}

// Primitive design of MySQLSink
#[allow(dead_code)]
pub struct MySQLSink {
cfg: MySQLConfig,
}

impl MySQLSink {
pub fn new(cfg: MySQLConfig) -> Self {
Self { cfg }
}

fn endpoint(&self) -> String {
self.cfg.endpoint.clone()
}

fn table(&self) -> String {
self.cfg.table.clone()
}

fn database(&self) -> Option<String> {
self.cfg.database.clone()
}

fn user(&self) -> Option<String> {
self.cfg.user.clone()
}

fn password(&self) -> Option<String> {
self.cfg.password.clone()
}
}

struct MySQLValue(Value);

impl fmt::Display for MySQLValue {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0.as_sql(true))
}
}

impl TryFrom<Datum> for MySQLValue {
type Error = SinkError;

fn try_from(datum: Datum) -> Result<MySQLValue> {
if let Some(scalar) = datum {
match scalar {
ScalarImpl::Int16(v) => Ok(MySQLValue(v.into())),
ScalarImpl::Int32(v) => Ok(MySQLValue(v.into())),
ScalarImpl::Int64(v) => Ok(MySQLValue(v.into())),
ScalarImpl::Float32(v) => Ok(MySQLValue(f32::from(v).into())),
ScalarImpl::Float64(v) => Ok(MySQLValue(f64::from(v).into())),
ScalarImpl::Bool(v) => Ok(MySQLValue(v.into())),
ScalarImpl::Decimal(Decimal::Normalized(v)) => Ok(MySQLValue(v.into())),
ScalarImpl::Decimal(_) => panic!("NaN, -inf, +inf are not supported by MySQL"),
ScalarImpl::Utf8(v) => Ok(MySQLValue(v.into())),
ScalarImpl::NaiveDate(v) => Ok(MySQLValue(format!("{}", v).into())),
ScalarImpl::NaiveTime(v) => Ok(MySQLValue(format!("{}", v).into())),
ScalarImpl::NaiveDateTime(v) => Ok(MySQLValue(format!("{}", v).into())),
// ScalarImpl::Interval(v) => Ok(MySQLValue(Value::NULL)),
_ => unimplemented!(),
}
} else {
Ok(MySQLValue(Value::NULL))
}
}
}

#[async_trait]
impl Sink for MySQLSink {
async fn write_batch(&mut self, chunk: StreamChunk, schema: &Schema) -> Result<()> {
// Closure that takes an idx to create a vector of MySQLValues from a StreamChunk 'row'.
let values = |idx| -> Result<Vec<MySQLValue>> {
chunk
.columns()
.iter()
.map(|x| MySQLValue::try_from(x.array_ref().datum_at(idx)))
.collect_vec()
.into_iter()
.collect()
};

// Closure that builds a String containing WHERE conditions, i.e. 'v1=1 AND v2=2'.
// Perhaps better to replace this functionality with a new Sink trait method.
let conditions = |values: Vec<MySQLValue>| {
schema
.names()
.iter()
.zip_eq(values.iter())
.map(|(c, v)| format!("{}={}", c, v))
.collect::<Vec<String>>()
};

// Build a connection and start transaction
let endpoint = self.endpoint();
let mut endpoint = endpoint.split(':');
let mut builder = OptsBuilder::default()
.user(self.user())
.pass(self.password())
.ip_or_hostname(endpoint.next().unwrap())
.db_name(self.database());
// TODO(nanderstabel): Fix ParseIntError
if let Some(port) = endpoint.next() {
builder = builder.tcp_port(port.parse().unwrap());
}

let mut conn = Conn::new(builder).await?;
let mut transaction = conn.start_transaction(TxOpts::default()).await?;

let mut iter = chunk.ops().iter().enumerate();
while let Some((idx, op)) = iter.next() {
// Get SQL statement
let stmt = match *op {
Insert => format!(
"INSERT INTO {} VALUES ({});",
self.table(),
join(values(idx)?, ",")
),
Delete => format!(
"DELETE FROM {} WHERE ({});",
self.table(),
join(conditions(values(idx)?), " AND ")
),
UpdateDelete => {
if let Some((idx2, UpdateInsert)) = iter.next() {
format!(
"UPDATE {} SET {} WHERE {};",
self.table(),
join(conditions(values(idx2)?), ","),
join(conditions(values(idx)?), " AND ")
)
} else {
panic!("UpdateDelete should always be followed by an UpdateInsert!")
}
}
_ => panic!("UpdateInsert should always follow an UpdateDelete!"),
};
transaction.exec_drop(stmt, Params::Empty).await?;
}

// Commit and drop the connection.
transaction.commit().await?;
drop(conn);
Ok(())
}
}

#[cfg(test)]
mod test {
use risingwave_common::types::chrono_wrapper::*;
use rust_decimal::Decimal as RustDecimal;

use super::*;

struct ConnectionParams<'a> {
pub endpoint: &'a str,
pub table: &'a str,
pub database: &'a str,
pub user: &'a str,
pub password: &'a str,
}

#[test]
fn test_date() {
assert_eq!(
MySQLValue::try_from(Some(ScalarImpl::NaiveDate(NaiveDateWrapper::default())))
.unwrap()
.to_string(),
"'1970-01-01'"
);
}

#[test]
fn test_time() {
assert_eq!(
MySQLValue::try_from(Some(ScalarImpl::NaiveTime(NaiveTimeWrapper::default())))
.unwrap()
.to_string(),
"'00:00:00'"
);
}

#[test]
fn test_datetime() {
assert_eq!(
MySQLValue::try_from(Some(ScalarImpl::NaiveDateTime(
NaiveDateTimeWrapper::default()
)))
.unwrap()
.to_string(),
"'1970-01-01 00:00:00'"
);
}

#[test]
fn test_decimal() {
assert_eq!(
MySQLValue::try_from(Some(ScalarImpl::Decimal(Decimal::Normalized(
RustDecimal::new(0, 0)
))))
.unwrap()
.to_string(),
"'0'"
);
assert_eq!(
MySQLValue::try_from(Some(ScalarImpl::Decimal(Decimal::Normalized(
RustDecimal::new(124, 5)
))))
.unwrap()
.to_string(),
"'0.00124'"
);
}
}
Loading

0 comments on commit b7537bb

Please sign in to comment.