Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): support source failover & refactor #2257

Merged
merged 10 commits into from
May 6, 2022
6 changes: 3 additions & 3 deletions src/batch/src/executor2/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ mod tests {
// Read
let chunk = reader.next().await?;

assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
assert_eq!(chunk.chunk.ops().to_vec(), vec![Op::Delete; 5]);

assert_eq!(
chunk.columns()[0]
chunk.chunk.columns()[0]
.array()
.as_int64()
.iter()
Expand All @@ -229,7 +229,7 @@ mod tests {
);

assert_eq!(
chunk.columns()[1]
chunk.chunk.columns()[1]
.array()
.as_int64()
.iter()
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor2/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ mod tests {
let chunk = reader.next().await?;

assert_eq!(
chunk.columns()[0]
chunk.chunk.columns()[0]
.array()
.as_int64()
.iter()
Expand All @@ -268,7 +268,7 @@ mod tests {
);

assert_eq!(
chunk.columns()[1]
chunk.chunk.columns()[1]
.array()
.as_int64()
.iter()
Expand Down
100 changes: 92 additions & 8 deletions src/connector/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use itertools::Itertools;
use kafka::enumerator::KafkaSplitEnumerator;
use serde::{Deserialize, Serialize};

use crate::dummy_connector::DummySplitReader;
use crate::kafka::source::KafkaSplitReader;
use crate::kinesis::source::reader::KinesisSplitReader;
use crate::kinesis::split::KinesisOffset;
use crate::pulsar::split::PulsarOffset;

pub enum SourceOffset {
Number(i64),
Expand All @@ -47,22 +52,93 @@ pub struct SourceMessage {
}

/// The metadata of a split.
pub trait SourceSplit: Sized {
pub trait SplitMetaData: Sized {
fn id(&self) -> String;
fn to_string(&self) -> Result<String>;
fn to_json_bytes(&self) -> Bytes;
fn restore_from_bytes(bytes: &[u8]) -> Result<Self>;
}

/// The persistent state of the connector.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectorState {
pub identifier: Bytes,
pub start_offset: String,
pub end_offset: String,
}

impl ConnectorState {
pub fn from_hashmap(state: HashMap<String, String>) -> Vec<Self> {
if state.is_empty() {
return vec![];
}
let mut connector_states: Vec<Self> = Vec::with_capacity(state.len());
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
for (split, offset) in state {
connector_states.push(Self {
identifier: Bytes::from(split),
start_offset: offset.clone(),
end_offset: "".to_string(),
})
}
connector_states
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl From<SplitImpl> for ConnectorState {
fn from(split: SplitImpl) -> Self {
match split {
SplitImpl::Kafka(kafka) => Self {
identifier: Bytes::from(kafka.partition.to_string()),
start_offset: kafka.start_offset.unwrap().to_string(),
end_offset: if let Some(end_offset) = kafka.stop_offset {
end_offset.to_string()
} else {
"".to_string()
},
},
SplitImpl::Kinesis(kinesis) => Self {
identifier: Bytes::from(kinesis.shard_id),
start_offset: match kinesis.start_position {
KinesisOffset::SequenceNumber(s) => s,
_ => "".to_string(),
},
end_offset: match kinesis.end_position {
KinesisOffset::SequenceNumber(s) => s,
_ => "".to_string(),
},
},
SplitImpl::Pulsar(pulsar) => Self {
identifier: Bytes::from(pulsar.sub_topic),
start_offset: match pulsar.start_offset {
PulsarOffset::MessageID(id) => id.to_string(),
_ => "".to_string(),
},
end_offset: match pulsar.stop_offset {
PulsarOffset::MessageID(id) => id.to_string(),
_ => "".to_string(),
},
},
}
}
}

impl SplitMetaData for ConnectorState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ConnectorState is not a SplitMetaData? It's not going to be used by list_splits().

Suggested change
impl SplitMetaData for ConnectorState {
impl ConnectorState {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of the SourceStateHandler store need to implement the trait.

fn id(&self) -> String {
String::from_utf8(self.identifier.to_vec()).unwrap()
}

fn to_json_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
}

fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
}
}

#[derive(Debug, Clone)]
pub enum ConnectorStateV2 {
// ConnectorState should change to Vec<ConnectorState> because there can be multiple readers
// in a source executor
State(ConnectorState),
Splits(Vec<SplitImpl>),
None,
Expand All @@ -81,17 +157,25 @@ pub trait SplitReader {
pub enum SplitReaderImpl {
Kafka(KafkaSplitReader),
Kinesis(KinesisSplitReader),
Dummy(DummySplitReader),
}

impl SplitReaderImpl {
pub async fn next(&mut self) -> Result<Option<Vec<SourceMessage>>> {
match self {
Self::Kafka(r) => r.next().await,
Self::Kinesis(r) => r.next().await,
Self::Dummy(r) => r.next().await,
}
}

pub async fn create(config: Properties, state: ConnectorStateV2) -> Result<Self> {
if let ConnectorStateV2::Splits(s) = &state {
if s.is_empty() {
return Ok(Self::Dummy(DummySplitReader {}));
}
}

let upstream_type = config.get_connector_type()?;
let connector = match upstream_type.as_str() {
KAFKA_SOURCE => Self::Kafka(KafkaSplitReader::new(config, state).await?),
Expand All @@ -108,7 +192,7 @@ impl SplitReaderImpl {
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator {
type Split: SourceSplit + Send + Sync;
type Split: SplitMetaData + Send + Sync;
async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
}

Expand Down Expand Up @@ -139,11 +223,11 @@ impl SplitImpl {
}
}

pub fn to_string(&self) -> Result<String> {
pub fn to_json_bytes(&self) -> Bytes {
match self {
SplitImpl::Kafka(k) => k.to_string(),
SplitImpl::Pulsar(p) => p.to_string(),
SplitImpl::Kinesis(k) => k.to_string(),
SplitImpl::Kafka(k) => k.to_json_bytes(),
SplitImpl::Pulsar(p) => p.to_json_bytes(),
SplitImpl::Kinesis(k) => k.to_json_bytes(),
}
}

Expand Down
27 changes: 27 additions & 0 deletions src/connector/src/dummy_connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Result;
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
use async_trait::async_trait;
use futures::future;

use crate::{ConnectorStateV2, Properties, SourceMessage, SplitReader};

/// [`DummySplitReader`] is a placeholder for source executor that is assigned no split. It will
/// wait forever when calling `next`.
#[derive(Clone, Debug)]
pub struct DummySplitReader;

#[async_trait]
impl SplitReader for DummySplitReader {
async fn next(&mut self) -> Result<Option<Vec<SourceMessage>>> {
let pending = future::pending();
let () = pending.await;

unreachable!()
}

async fn new(_properties: Properties, _state: ConnectorStateV2) -> Result<Self>
where
Self: Sized,
{
Ok(Self {})
}
}
13 changes: 4 additions & 9 deletions src/connector/src/filesystem/s3/source/s3_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io;
use tokio_util::io::ReaderStream;

use crate::base::{SourceMessage, SourceSplit, SplitReader};
use crate::base::{SourceMessage, SplitMetaData, SplitReader};
use crate::filesystem::file_common::{EntryStat, StatusWatch};
use crate::filesystem::s3::s3_dir::FileSystemOptError::IllegalS3FilePath;
use crate::filesystem::s3::s3_dir::{
Expand Down Expand Up @@ -114,18 +114,13 @@ impl S3FileSplit {
}
}

impl SourceSplit for S3FileSplit {
impl SplitMetaData for S3FileSplit {
fn id(&self) -> String {
format!("{}/{}", self.bucket, self.s3_file.object.path)
}

fn to_string(&self) -> anyhow::Result<String> {
let split_str = serde_json::to_string(self);
if let Ok(split) = split_str {
Ok(split)
} else {
Err(anyhow::Error::from(split_str.err().unwrap()))
}
fn to_json_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
}

fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
Expand Down
9 changes: 5 additions & 4 deletions src/connector/src/kafka/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SourceSplit;
use crate::base::SplitMetaData;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct KafkaSplit {
Expand All @@ -25,13 +26,13 @@ pub struct KafkaSplit {
pub(crate) stop_offset: Option<i64>,
}

impl SourceSplit for KafkaSplit {
impl SplitMetaData for KafkaSplit {
fn id(&self) -> String {
format!("{}", self.partition)
}

fn to_string(&self) -> anyhow::Result<String> {
serde_json::to_string(self).map_err(|e| anyhow!(e))
fn to_json_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
Expand Down
9 changes: 5 additions & 4 deletions src/connector/src/kinesis/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SourceSplit;
use crate::base::SplitMetaData;

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum KinesisOffset {
Expand All @@ -33,13 +34,13 @@ pub struct KinesisSplit {
pub(crate) end_position: KinesisOffset,
}

impl SourceSplit for KinesisSplit {
impl SplitMetaData for KinesisSplit {
fn id(&self) -> String {
self.shard_id.to_string()
}

fn to_string(&self) -> anyhow::Result<String> {
serde_json::to_string(self).map_err(|e| anyhow!(e))
fn to_json_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ mod pulsar;
mod utils;
pub use base::*;
pub use utils::{AnyhowProperties, Properties};
pub mod dummy_connector;
pub mod state;
9 changes: 5 additions & 4 deletions src/connector/src/pulsar/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use anyhow::anyhow;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::base::SourceSplit;
use crate::base::SplitMetaData;

#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum PulsarOffset {
Expand All @@ -41,13 +42,13 @@ impl PulsarSplit {
}
}

impl SourceSplit for PulsarSplit {
impl SplitMetaData for PulsarSplit {
fn id(&self) -> String {
self.sub_topic.clone()
}

fn to_string(&self) -> anyhow::Result<String> {
serde_json::to_string(self).map_err(|e| anyhow!(e))
fn to_json_bytes(&self) -> Bytes {
Bytes::from(serde_json::to_string(self).unwrap())
}

fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
Expand Down
Loading