Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add state for source manager #3170

Merged
merged 6 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,13 @@ message SubscribeResponse {
service NotificationService {
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);
}

message SourceActorSplit {
string type = 1;
bytes split = 2;
}

message SourceActorInfo {
uint32 actor_id = 1;
repeated SourceActorSplit splits = 2;
}
268 changes: 232 additions & 36 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
// limitations under the License.

use std::borrow::BorrowMut;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -24,9 +25,13 @@ use risingwave_common::error::{internal_error, Result, ToRwResult};
use risingwave_common::try_match_expand;
use risingwave_connector::{ConnectorProperties, SplitEnumeratorImpl, SplitImpl};
use risingwave_pb::catalog::source::Info;
use risingwave_pb::catalog::source::Info::StreamSource;
use risingwave_pb::catalog::Source;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::{
SourceActorInfo as ProstSourceActorInfo, SourceActorSplit as ProstSourceActorSplit,
};
use risingwave_pb::stream_service::{
CreateSourceRequest as ComputeNodeCreateSourceRequest,
DropSourceRequest as ComputeNodeDropSourceRequest,
Expand All @@ -41,12 +46,14 @@ use tokio::{select, time};
use crate::barrier::BarrierManagerRef;
use crate::cluster::ClusterManagerRef;
use crate::manager::{CatalogManagerRef, MetaSrvEnv, SourceId};
use crate::model::{ActorId, FragmentId};
use crate::storage::MetaStore;
use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, Transactional};
use crate::storage::{MetaStore, Transaction};
use crate::stream::FragmentManagerRef;

pub type SourceManagerRef<S> = Arc<SourceManager<S>>;

const SOURCE_CF_NAME: &str = "cf/source";

#[allow(dead_code)]
pub struct SourceManager<S: MetaStore> {
env: MetaSrvEnv<S>,
Expand All @@ -70,6 +77,50 @@ pub struct ConnectorSourceWorker {
period: Duration,
}

#[derive(Debug)]
pub struct SourceActorInfo {
actor_id: ActorId,
splits: Vec<SplitImpl>,
}

impl MetadataModel for SourceActorInfo {
type KeyType = u32;
type ProstType = ProstSourceActorInfo;

fn cf_name() -> String {
SOURCE_CF_NAME.to_string()
}

fn to_protobuf(&self) -> Self::ProstType {
Self::ProstType {
actor_id: self.actor_id,
splits: self
.splits
.iter()
.map(|split| ProstSourceActorSplit {
r#type: split.get_type(),
split: split.to_json_bytes().to_vec(),
})
.collect(),
}
}

fn from_protobuf(prost: Self::ProstType) -> Self {
Self {
actor_id: prost.actor_id,
splits: prost
.splits
.into_iter()
.map(|split| SplitImpl::restore_from_bytes(split.r#type, &split.split).unwrap())
.collect(),
}
}

fn key(&self) -> Result<Self::KeyType> {
Ok(self.actor_id)
}
}

impl ConnectorSourceWorker {
pub async fn create(source: &Source, period: Duration) -> Result<Self> {
let source_id = source.get_id();
Expand Down Expand Up @@ -135,20 +186,25 @@ pub struct ConnectorSourceWorkerHandle {
pub struct SourceManagerCore<S: MetaStore> {
pub fragment_manager: FragmentManagerRef<S>,
pub managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
pub source_fragments: HashMap<SourceId, Vec<FragmentId>>,
pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
}

impl<S> SourceManagerCore<S>
where
S: MetaStore,
{
fn new(fragment_manager: FragmentManagerRef<S>) -> Self {
fn new(
fragment_manager: FragmentManagerRef<S>,
managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
) -> Self {
Self {
fragment_manager,
managed_sources: HashMap::new(),
source_fragments: HashMap::new(),
actor_splits: HashMap::new(),
managed_sources,
source_fragments,
actor_splits,
}
}

Expand Down Expand Up @@ -220,26 +276,76 @@ where

pub async fn patch_diff(
&mut self,
source_fragments: Option<HashMap<SourceId, Vec<FragmentId>>>,
source_fragments: Option<HashMap<SourceId, BTreeSet<FragmentId>>>,
actor_splits: Option<HashMap<ActorId, Vec<SplitImpl>>>,
) -> Result<()> {
) {
if let Some(source_fragments) = source_fragments {
for (source_id, mut fragment_ids) in source_fragments {
self.source_fragments
.entry(source_id)
.or_insert(vec![])
.or_insert_with(BTreeSet::default)
.append(&mut fragment_ids);
}
}

if let Some(actor_splits) = actor_splits {
for (actor_id, splits) in actor_splits {
self.actor_splits.insert(actor_id, splits);
// TODO store state
self.actor_splits.insert(actor_id, splits.clone());
}
}
}

Ok(())
pub async fn drop_diff(
&mut self,
source_fragments: Option<HashMap<SourceId, BTreeSet<FragmentId>>>,
actor_splits: Option<HashSet<ActorId>>,
) {
if let Some(source_fragments) = source_fragments {
for (source_id, fragment_ids) in source_fragments {
if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
let managed_fragment_ids = entry.get_mut();
for fragment_id in &fragment_ids {
managed_fragment_ids.remove(fragment_id);
}

if managed_fragment_ids.is_empty() {
entry.remove();
}
}

if let Some(managed_fragment_ids) = self.source_fragments.get_mut(&source_id) {
for fragment_id in fragment_ids {
managed_fragment_ids.remove(&fragment_id);
}
}
}
}

if let Some(actor_splits) = actor_splits {
for actor_id in actor_splits {
self.actor_splits.remove(&actor_id);
}
}
}
}

pub(crate) fn fetch_source_fragments(
source_fragments: &mut HashMap<SourceId, BTreeSet<FragmentId>>,
table_fragments: &TableFragments,
) {
for fragment in table_fragments.fragments() {
for actor in &fragment.actors {
if let Some(source_id) =
TableFragments::fetch_stream_source_id(actor.nodes.as_ref().unwrap())
{
source_fragments
.entry(source_id)
.or_insert(BTreeSet::new())
.insert(fragment.fragment_id as FragmentId);

break;
}
}
}
}

Expand Down Expand Up @@ -300,7 +406,35 @@ where
catalog_manager: CatalogManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
) -> Result<Self> {
let core = Arc::new(Mutex::new(SourceManagerCore::new(fragment_manager)));
let mut managed_sources = HashMap::new();
{
let catalog_guard = catalog_manager.get_catalog_core_guard().await;
let sources = catalog_guard.list_sources().await?;

for source in sources {
if let Some(StreamSource(_)) = source.info {
Self::create_source_worker(&source, &mut managed_sources).await?
}
}
}

let mut source_fragments = HashMap::new();
for table_fragments in fragment_manager.list_table_fragments().await? {
fetch_source_fragments(&mut source_fragments, &table_fragments)
}

let actor_splits = SourceActorInfo::list(env.meta_store())
.await?
.into_iter()
.map(|source_actor_info| (source_actor_info.actor_id, source_actor_info.splits))
.collect();

let core = Arc::new(Mutex::new(SourceManagerCore::new(
fragment_manager,
managed_sources,
source_fragments,
actor_splits,
)));

Ok(Self {
env,
Expand All @@ -311,19 +445,65 @@ where
})
}

pub async fn drop_update(
&self,
source_fragments: Option<HashMap<SourceId, BTreeSet<FragmentId>>>,
actor_splits: Option<HashSet<ActorId>>,
) -> Result<()> {
{
let mut core = self.core.lock().await;
core.drop_diff(source_fragments, actor_splits.clone()).await;
}

let mut trx = Transaction::default();
if let Some(actor_ids) = actor_splits {
for actor_id in actor_ids {
let source_actor_info = SourceActorInfo {
actor_id,
splits: vec![],
};
source_actor_info.delete_in_transaction(&mut trx)?;
}
}

self.env
.meta_store()
.txn(trx)
.await
.map_err(|e| internal_error(e.to_string()))
}

pub async fn patch_update(
&self,
source_fragments: Option<HashMap<SourceId, Vec<FragmentId>>>,
source_fragments: Option<HashMap<SourceId, BTreeSet<FragmentId>>>,
actor_splits: Option<HashMap<ActorId, Vec<SplitImpl>>>,
) {
let mut core = self.core.lock().await;
let _ = core.patch_diff(source_fragments, actor_splits).await;
) -> Result<()> {
{
let mut core = self.core.lock().await;
core.patch_diff(source_fragments, actor_splits.clone())
.await;
}

let mut trx = Transaction::default();
if let Some(actor_splits) = actor_splits {
for (actor_id, splits) in actor_splits {
let source_actor_info = SourceActorInfo { actor_id, splits };

source_actor_info.upsert_in_transaction(&mut trx)?;
}
}

self.env
.meta_store()
.txn(trx)
.await
.map_err(|e| internal_error(e.to_string()))
}

pub async fn pre_allocate_splits(
&self,
table_id: &TableId,
source_fragments: HashMap<SourceId, Vec<FragmentId>>,
source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
) -> Result<HashMap<ActorId, Vec<SplitImpl>>> {
let core = self.core.lock().await;
let table_fragments = core
Expand Down Expand Up @@ -412,27 +592,36 @@ where
return Ok(());
}

if let Some(Info::StreamSource(_)) = source.info {
let mut worker = ConnectorSourceWorker::create(source, Duration::from_secs(10)).await?;
let current_splits_ref = worker.current_splits.clone();
log::info!("Spawning new watcher for source {}", source.id);

let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();

let handle = tokio::spawn(async move { worker.run(sync_call_rx).await });
core.managed_sources.insert(
source.id,
ConnectorSourceWorkerHandle {
handle,
sync_call_tx,
splits: current_splits_ref,
},
);
if let Some(StreamSource(_)) = source.info {
Self::create_source_worker(source, &mut core.managed_sources).await?;
}

Ok(())
}

async fn create_source_worker(
source: &Source,
managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
) -> Result<()> {
let mut worker = ConnectorSourceWorker::create(source, Duration::from_secs(10)).await?;
let current_splits_ref = worker.current_splits.clone();
log::info!("spawning new watcher for source {}", source.id);

let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();

let handle = tokio::spawn(async move { worker.run(sync_call_rx).await });
managed_sources.insert(
source.id,
ConnectorSourceWorkerHandle {
handle,
sync_call_tx,
splits: current_splits_ref,
},
);

Ok(())
}

pub async fn drop_source(&self, source_id: SourceId) -> Result<()> {
let futures = self
.all_stream_clients()
Expand All @@ -449,6 +638,13 @@ where
handle.handle.abort();
}

if core.source_fragments.contains_key(&source_id) {
log::warn!(
"dropping source {}, but associated fragments still exists",
source_id
);
}

Ok(())
}

Expand Down
Loading