Skip to content

Commit

Permalink
feat: scheduling splits when creating a materialized view (#1788)
Browse files Browse the repository at this point in the history
* add register source in source manager

Signed-off-by: Shanicky Chen <[email protected]>

* add tick for source manager

Signed-off-by: Peng Chen <[email protected]>

* inject split assign mutation

Signed-off-by: Peng Chen <[email protected]>

* fix connector code

Signed-off-by: Peng Chen <[email protected]>

* add unregister

Signed-off-by: Peng Chen <[email protected]>

* add stop for source discovery

Signed-off-by: Peng Chen <[email protected]>

* tmp

Signed-off-by: Peng Chen <[email protected]>

* add patch for stream manager

Signed-off-by: Peng Chen <[email protected]>

* fix kafka split enumerator

Signed-off-by: Peng Chen <[email protected]>

* add stream_source_splits in source executor

Signed-off-by: Peng Chen <[email protected]>

* rename to stream source state

Signed-off-by: Peng Chen <[email protected]>

* remove useless code

Signed-off-by: Peng Chen <[email protected]>

Co-authored-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky and Shanicky Chen authored Apr 12, 2022
1 parent 6b68bf1 commit 62aba2a
Show file tree
Hide file tree
Showing 29 changed files with 510 additions and 212 deletions.
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ message ActorMapping {
repeated uint32 hash_mapping = 1;
}

// todo: StreamSourceNode or TableSourceNode
message SourceNode {
enum SourceType {
TABLE = 0;
Expand All @@ -22,6 +23,15 @@ message SourceNode {
plan.TableRefId table_ref_id = 1;
repeated int32 column_ids = 2;
SourceType source_type = 3;
// split allocation information,
// and in the future will distinguish between `StreamSource` and `TableSource`
// so that there is no need to put many fields that are not common into the same SourceNode structure
StreamSourceState stream_source_state = 4;
}

message StreamSourceState {
string split_type = 1;
repeated bytes stream_source_splits = 2;
}

message ProjectNode {
Expand Down
1 change: 0 additions & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ impl StreamServiceImpl {
.create_source_v2(&id, info.to_owned())
.await?;
}

Info::TableSource(info) => {
let columns = info
.columns
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/table_v2_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ async fn test_table_v2_materialize() -> Result<()> {
1,
"SourceExecutor".to_string(),
Arc::new(StreamingMetrics::unused()),
vec![],
)?;

// Create a `Materialize` to write the changes to storage
Expand Down
80 changes: 63 additions & 17 deletions src/connector/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ pub enum SourceOffset {
String(String),
}

use crate::pulsar::PulsarSplitEnumerator;
use crate::{kafka, pulsar};
use crate::kafka::KafkaSplit;
use crate::kinesis::split::KinesisSplit;
use crate::pulsar::{PulsarSplit, PulsarSplitEnumerator};
use crate::{kafka, kinesis, pulsar};

const UPSTREAM_SOURCE_KEY: &str = "connector";
const KAFKA_SOURCE: &str = "kafka";
const KINESIS_SOURCE: &str = "kinesis";
const PULSAR_SOURCE: &str = "pulsar";

pub trait SourceMessage {
fn payload(&self) -> Result<Option<&[u8]>>;
Expand All @@ -49,9 +52,11 @@ pub struct InnerMessage {
pub split_id: String,
}

pub trait SourceSplit {
pub trait SourceSplit: Sized {
fn id(&self) -> String;
fn to_string(&self) -> Result<String>;
fn restore_from_bytes(bytes: &[u8]) -> Result<Self>;
fn get_type(&self) -> String;
}

#[derive(Debug, Clone)]
Expand All @@ -76,13 +81,55 @@ pub trait SplitEnumerator {
}

pub enum SplitEnumeratorImpl {
Kafka(KafkaSplitEnumerator),
Kafka(kafka::enumerator::KafkaSplitEnumerator),
Pulsar(pulsar::enumerator::PulsarSplitEnumerator),
Kinesis(kinesis::enumerator::client::KinesisSplitEnumerator),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SplitImpl {
Kafka(kafka::KafkaSplit),
Pulsar(pulsar::PulsarSplit),
Kinesis(kinesis::split::KinesisSplit),
}

impl SplitImpl {
pub fn id(&self) -> String {
match self {
SplitImpl::Kafka(k) => k.id(),
SplitImpl::Pulsar(p) => p.id(),
SplitImpl::Kinesis(k) => k.id(),
}
}

pub fn to_string(&self) -> Result<String> {
match self {
SplitImpl::Kafka(k) => k.to_string(),
SplitImpl::Pulsar(p) => p.to_string(),
SplitImpl::Kinesis(k) => k.to_string(),
}
}

pub fn get_type(&self) -> String {
match self {
SplitImpl::Kafka(k) => k.get_type(),
SplitImpl::Pulsar(p) => p.get_type(),
SplitImpl::Kinesis(k) => k.get_type(),
}
}

pub fn restore_from_bytes(split_type: String, bytes: &[u8]) -> Result<Self> {
match split_type.as_str() {
kafka::KAFKA_SPLIT_TYPE => KafkaSplit::restore_from_bytes(bytes).map(SplitImpl::Kafka),
pulsar::PULSAR_SPLIT_TYPE => {
PulsarSplit::restore_from_bytes(bytes).map(SplitImpl::Pulsar)
}
kinesis::split::KINESIS_SPLIT_TYPE => {
KinesisSplit::restore_from_bytes(bytes).map(SplitImpl::Kinesis)
}
other => Err(anyhow!("split type {} not supported", other)),
}
}
}

impl SplitEnumeratorImpl {
Expand All @@ -96,21 +143,26 @@ impl SplitEnumeratorImpl {
.list_splits()
.await
.map(|ss| ss.into_iter().map(SplitImpl::Pulsar).collect_vec()),
SplitEnumeratorImpl::Kinesis(k) => k
.list_splits()
.await
.map(|ss| ss.into_iter().map(SplitImpl::Kinesis).collect_vec()),
}
}
}

pub fn extract_split_enumerator(
properties: &HashMap<String, String>,
) -> Result<SplitEnumeratorImpl> {
let source_type = match properties.get("upstream.source") {
None => return Err(anyhow!("upstream.source not found")),
let source_type = match properties.get(UPSTREAM_SOURCE_KEY) {
None => return Err(anyhow!("{} not found", UPSTREAM_SOURCE_KEY)),
Some(value) => value,
};

match source_type.as_ref() {
"kafka" => KafkaSplitEnumerator::new(properties).map(SplitEnumeratorImpl::Kafka),
"pulsar" => PulsarSplitEnumerator::new(properties).map(SplitEnumeratorImpl::Pulsar),
match source_type.as_str() {
KAFKA_SOURCE => KafkaSplitEnumerator::new(properties).map(SplitEnumeratorImpl::Kafka),
PULSAR_SOURCE => PulsarSplitEnumerator::new(properties).map(SplitEnumeratorImpl::Pulsar),
KINESIS_SOURCE => todo!(),
_ => Err(anyhow!("unsupported source type: {}", source_type)),
}
}
Expand All @@ -121,14 +173,8 @@ pub async fn new_connector(
) -> Result<Box<dyn SourceReader + Send + Sync>> {
let upstream_type = config.get(UPSTREAM_SOURCE_KEY).unwrap();
let connector: Box<dyn SourceReader + Send + Sync> = match upstream_type.as_str() {
KAFKA_SOURCE => {
let kafka = KafkaSplitReader::new(config, state).await?;
Box::new(kafka)
}
KINESIS_SOURCE => {
let kinesis = KinesisSplitReader::new(config, state).await?;
Box::new(kinesis)
}
KAFKA_SOURCE => Box::new(KafkaSplitReader::new(config, state).await?),
KINESIS_SOURCE => Box::new(KinesisSplitReader::new(config, state).await?),
_other => {
todo!()
}
Expand Down
12 changes: 11 additions & 1 deletion src/connector/src/filesystem/s3/source/s3_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use aws_sdk_s3::client as s3_client;
use aws_smithy_http::byte_stream::ByteStream;
Expand Down Expand Up @@ -69,6 +69,8 @@ impl Default for S3File {
}
}

const S3_SPLIT_TYPE: &str = "s3";

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct S3FileSplit {
bucket: String,
Expand Down Expand Up @@ -127,6 +129,14 @@ impl SourceSplit for S3FileSplit {
Err(anyhow::Error::from(split_str.err().unwrap()))
}
}

fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
}

fn get_type(&self) -> String {
S3_SPLIT_TYPE.to_string()
}
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 62aba2a

Please sign in to comment.