Skip to content

Commit

Permalink
feat(meta): add source info and stream source split info in get_clust… (
Browse files Browse the repository at this point in the history
risingwavelabs#4277)

feat(meta): add source info and stream source split info in get_cluster_info

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
KeXiangWang and mergify[bot] authored Aug 5, 2022
1 parent f97a836 commit ce17661
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 2 deletions.
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package meta;
import "catalog.proto";
import "common.proto";
import "hummock.proto";
import "source.proto";
import "stream_plan.proto";
import "user.proto";

Expand Down Expand Up @@ -217,6 +218,8 @@ message GetClusterInfoRequest {}
message GetClusterInfoResponse {
repeated common.WorkerNode worker_nodes = 1;
repeated TableFragments table_fragments = 2;
map<uint32, source.ConnectorSplits> actor_splits = 3;
map<uint32, catalog.StreamSourceInfo> stream_source_infos = 4;
}

service ScaleService {
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/kafka/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@ impl KafkaSplit {
self.topic.clone(),
)
}

pub fn get_topic_and_partition(&self) -> (String, i32) {
(self.topic.clone(), self.partition)
}
}
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub async fn cluster_info() -> anyhow::Result<()> {
let GetClusterInfoResponse {
worker_nodes,
table_fragments,
actor_splits: _,
stream_source_infos: _,
} = meta_client.get_cluster_info().await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
env.clone(),
catalog_manager.clone(),
stream_manager,
source_manager,
source_manager.clone(),
cluster_manager.clone(),
fragment_manager.clone(),
ddl_lock.clone(),
Expand All @@ -430,6 +430,8 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
barrier_manager.clone(),
fragment_manager.clone(),
cluster_manager.clone(),
source_manager,
catalog_manager.clone(),
ddl_lock,
);
let cluster_srv = ClusterServiceImpl::<S>::new(cluster_manager.clone());
Expand Down
43 changes: 42 additions & 1 deletion src/meta/src/rpc/service/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_pb::catalog::source::Info::StreamSource;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
use risingwave_pb::meta::{
GetClusterInfoRequest, GetClusterInfoResponse, PauseRequest, PauseResponse, ResumeRequest,
ResumeResponse,
};
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::barrier::{BarrierManagerRef, Command};
use crate::cluster::ClusterManagerRef;
use crate::manager::CatalogManagerRef;
use crate::model::MetadataModel;
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;
use crate::stream::{FragmentManagerRef, SourceManagerRef};

pub struct ScaleServiceImpl<S: MetaStore> {
barrier_manager: BarrierManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
cluster_manager: ClusterManagerRef<S>,
source_manager: SourceManagerRef<S>,
catalog_manager: CatalogManagerRef<S>,
ddl_lock: Arc<RwLock<()>>,
}

Expand All @@ -44,12 +50,16 @@ where
barrier_manager: BarrierManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
cluster_manager: ClusterManagerRef<S>,
source_manager: SourceManagerRef<S>,
catalog_manager: CatalogManagerRef<S>,
ddl_lock: Arc<RwLock<()>>,
) -> Self {
Self {
barrier_manager,
fragment_manager,
cluster_manager,
source_manager,
catalog_manager,
ddl_lock,
}
}
Expand Down Expand Up @@ -92,9 +102,40 @@ where
.list_worker_node(WorkerType::ComputeNode, None)
.await;

let actor_splits = self
.source_manager
.get_actor_splits()
.await
.into_iter()
.map(|(actor_id, splits)| {
(
actor_id,
ConnectorSplits {
splits: splits.iter().map(ConnectorSplit::from).collect(),
},
)
})
.collect();

let sources = self
.catalog_manager
.get_catalog_core_guard()
.await
.list_sources()
.await?;

let mut stream_source_infos = HashMap::new();
for source in sources {
if let Some(StreamSource(info)) = source.info {
stream_source_infos.insert(source.id, info);
}
}

Ok(Response::new(GetClusterInfoResponse {
worker_nodes,
table_fragments,
actor_splits,
stream_source_infos,
}))
}
}
8 changes: 8 additions & 0 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ where
}
}
}

pub fn get_actor_splits(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
self.actor_splits.clone()
}
}

pub(crate) fn fetch_source_fragments(
Expand Down Expand Up @@ -759,4 +763,8 @@ where
.cloned()
.collect_vec()
}

pub async fn get_actor_splits(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
self.core.lock().await.get_actor_splits()
}
}

0 comments on commit ce17661

Please sign in to comment.