Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Apr 8, 2024
1 parent 1d45ad0 commit 3881de9
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ pub struct RedisCommon {
pub url: String,
}
pub enum RedisConn {
// Redis deployed as a cluster, clusters with only one node should also use this conn
Cluster(ClusterConnection),
// Redis is not deployed as a cluster
Single(MultiplexedConnection),
}

Expand Down Expand Up @@ -87,34 +89,35 @@ impl ConnectionLike for RedisConn {

impl RedisCommon {
pub async fn build_conn(&self) -> ConnectorResult<RedisConn> {
let url: Value =
serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e)))?;
match url {
Value::String(s) => {
let client = RedisClient::open(s)?;
match serde_json::from_str(&self.url).map_err(|e| SinkError::Config(anyhow!(e))) {
Ok(v) => {
if let Value::Array(list) = v {
let list = list
.into_iter()
.map(|s| {
if let Value::String(s) = s {
Ok(s)
} else {
Err(SinkError::Redis(
"redis.url must be array of string".to_string(),
)
.into())
}
})
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
} else {
Err(SinkError::Redis("redis.url must be array or string".to_string()).into())
}
}
Err(_) => {
let client = RedisClient::open(self.url.clone())?;
Ok(RedisConn::Single(
client.get_multiplexed_async_connection().await?,
))
}
Value::Array(list) => {
let list = list
.into_iter()
.map(|s| {
if let Value::String(s) = s {
Ok(s)
} else {
Err(
SinkError::Redis("redis.url must be array of string".to_string())
.into(),
)
}
})
.collect::<ConnectorResult<Vec<String>>>()?;

let client = ClusterClient::new(list)?;
Ok(RedisConn::Cluster(client.get_async_connection().await?))
}
_ => Err(SinkError::Redis("redis.url must be array or string".to_string()).into()),
}
}
}
Expand Down

0 comments on commit 3881de9

Please sign in to comment.