Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use version to synchronize catalog #749

Merged
merged 4 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ message DropRequest {

message DropResponse {
common.Status status = 1;
uint64 version = 2;
}

message GetCatalogRequest {}
Expand Down
2 changes: 2 additions & 0 deletions rust/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use core::fmt;
pub use column::*;
pub use schema::*;

pub type CatalogVersion = u64;

pub enum CatalogId {
DatabaseId(DatabaseId),
SchemaId(SchemaId),
Expand Down
77 changes: 19 additions & 58 deletions rust/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use risingwave_common::array::RwError;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -188,14 +189,14 @@ impl TryFrom<Catalog> for CatalogCache {
pub struct CatalogConnector {
meta_client: MetaClient,
catalog_cache: Arc<RwLock<CatalogCache>>,
catalog_updated_rx: Receiver<i32>,
catalog_updated_rx: Receiver<CatalogVersion>,
}

impl CatalogConnector {
pub fn new(
meta_client: MetaClient,
catalog_cache: Arc<RwLock<CatalogCache>>,
catalog_updated_rx: Receiver<i32>,
catalog_updated_rx: Receiver<CatalogVersion>,
) -> Self {
Self {
meta_client,
Expand All @@ -205,22 +206,16 @@ impl CatalogConnector {
}

pub async fn create_database(&self, db_name: &str) -> Result<()> {
self.meta_client
let (_, version) = self
.meta_client
.create_database(Database {
database_name: db_name.to_string(),
// Do not support MVCC DDL now.
..Default::default()
})
.await?;
// TODO(zehua) Implement `catalog_version` in meta server.
let mut rx = self.catalog_updated_rx.clone();
while self
.catalog_cache
.read()
.unwrap()
.get_database(db_name)
.is_none()
{
while *rx.borrow() < version {
rx.changed()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may extract this to a separated function like wait_version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The frontend should keep a max committed version in memory and could be initialized when start and fetch from meta. So the meta should stores a global catalog max committed version. This will be helpful for catalog MVCC implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Add a struct CatalogVersionGenerator in CatalogManagerCore of meta.

Expand All @@ -236,7 +231,8 @@ impl CatalogConnector {
.get_database(db_name)
.ok_or_else(|| RwError::from(CatalogError::NotFound("database", db_name.to_string())))?
.id();
self.meta_client
let (_, version) = self
.meta_client
.create_schema(Schema {
schema_name: schema_name.to_string(),
version: 0,
Expand All @@ -248,15 +244,8 @@ impl CatalogConnector {
}),
})
.await?;
// TODO(zehua) Implement `catalog_version` in meta server.
let mut rx = self.catalog_updated_rx.clone();
while self
.catalog_cache
.read()
.unwrap()
.get_schema(db_name, schema_name)
.is_none()
{
while *rx.borrow() < version {
rx.changed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the version gets changed between *rx.borrow() < version and rx.changed(), and the program gets stuck?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no problem. My fault.

.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Expand Down Expand Up @@ -303,16 +292,9 @@ impl CatalogConnector {
..Default::default()
},
);
self.meta_client.create_table(table.clone()).await?;
// TODO(zehua) Implement `catalog_version` in meta server.
let (_, version) = self.meta_client.create_table(table.clone()).await?;
let mut rx = self.catalog_updated_rx.clone();
while self
.catalog_cache
.read()
.unwrap()
.get_table(db_name, schema_name, &table.table_name)
.is_none()
{
while *rx.borrow() < version {
rx.changed()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Expand All @@ -335,16 +317,9 @@ impl CatalogConnector {
.id();

let table_ref_id = TableRefId::from(&table_id);
self.meta_client.drop_table(table_ref_id).await?;
// TODO(zehua) Implement `catalog_version` in meta server.
let version = self.meta_client.drop_table(table_ref_id).await?;
let mut rx = self.catalog_updated_rx.clone();
while self
.catalog_cache
.read()
.unwrap()
.get_table(db_name, schema_name, table_name)
.is_some()
{
while *rx.borrow() < version {
rx.changed()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Expand Down Expand Up @@ -374,16 +349,9 @@ impl CatalogConnector {
database_ref_id: Some(DatabaseRefId { database_id }),
schema_id,
};
self.meta_client.drop_schema(schema_ref_id).await?;
// TODO(zehua) Implement `catalog_version` in meta server.
let version = self.meta_client.drop_schema(schema_ref_id).await?;
let mut rx = self.catalog_updated_rx.clone();
while self
.catalog_cache
.read()
.unwrap()
.get_schema(db_name, schema_name)
.is_some()
{
while *rx.borrow() < version {
rx.changed()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Expand All @@ -400,16 +368,9 @@ impl CatalogConnector {
.ok_or_else(|| RwError::from(CatalogError::NotFound("database", db_name.to_string())))?
.id() as i32;
let database_ref_id = DatabaseRefId { database_id };
self.meta_client.drop_database(database_ref_id).await?;
// TODO(zehua) Implement `catalog_version` in meta server.
let version = self.meta_client.drop_database(database_ref_id).await?;
let mut rx = self.catalog_updated_rx.clone();
while self
.catalog_cache
.read()
.unwrap()
.get_database(db_name)
.is_some()
{
while *rx.borrow() < version {
Copy link
Contributor

@skyzh skyzh Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are using this in a wrong way...

This method does not mark the returned value as seen, so future calls to changed may return immediately even if you have already seen the value with a call to borrow.

should call borrow_and_update instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise .changed will always return immediately, and we are spinning on this thread.

Copy link
Contributor

@skyzh skyzh Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the version is not latest, the current approach may cause the condition being checked twice. e.g., rx.borrow() returns 1, but as we didn't call changed, the following changed will immediately return, and we will get rx.borrow() == 1 again. But this time, 1 is marked seen, so there won't be problem, and we will wait for the next update.

I'd recommend using borrow_and_update for this scenario, but there won't be problem if we don't modify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the version is not latest, the current approach may cause the condition being checked twice. e.g., rx.borrow() returns 1, but as we didn't call changed, the following changed will immediately return, and we will get rx.borrow() == 1 again. But this time, 1 is marked seen, so there won't be problem.

I'd recommend using borrow_and_update for this scenario, but there won't be problem if we don't modify this.

Yes, you are right. I will change it.

rx.changed()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
Expand Down Expand Up @@ -471,7 +432,7 @@ mod tests {
use crate::catalog::catalog_service::{
CatalogCache, CatalogConnector, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME,
};
use crate::observer::observer_manager::{ObserverManager, UPDATE_FINISH_NOTIFICATION};
use crate::observer::observer_manager::ObserverManager;
use crate::scheduler::schedule::WorkerNodeManager;
use crate::test_utils::FrontendMockMetaClient;

Expand Down Expand Up @@ -501,7 +462,7 @@ mod tests {
// Init meta and catalog.
let meta_client = MetaClient::mock(FrontendMockMetaClient::new().await);

let (catalog_updated_tx, catalog_updated_rx) = watch::channel(UPDATE_FINISH_NOTIFICATION);
let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
let catalog_cache = Arc::new(RwLock::new(
CatalogCache::new(meta_client.clone()).await.unwrap(),
));
Expand Down
16 changes: 9 additions & 7 deletions rust/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock};

use log::{error, info};
use risingwave_common::array::RwError;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
use risingwave_pb::common::{WorkerNode, WorkerType};
Expand All @@ -16,16 +17,14 @@ use crate::catalog::catalog_service::CatalogCache;
use crate::catalog::{CatalogError, DatabaseId, SchemaId};
use crate::scheduler::schedule::WorkerNodeManagerRef;

pub const UPDATE_FINISH_NOTIFICATION: i32 = 0;

/// `ObserverManager` is used to update data based on notification from meta.
/// Call `start` to spawn a new asynchronous task
/// which receives meta's notification and update frontend's data.
pub(crate) struct ObserverManager {
rx: Box<dyn NotificationStream>,
worker_node_manager: WorkerNodeManagerRef,
catalog_cache: Arc<RwLock<CatalogCache>>,
catalog_updated_tx: Sender<i32>,
catalog_updated_tx: Sender<CatalogVersion>,
}

impl ObserverManager {
Expand All @@ -34,7 +33,7 @@ impl ObserverManager {
addr: SocketAddr,
worker_node_manager: WorkerNodeManagerRef,
catalog_cache: Arc<RwLock<CatalogCache>>,
catalog_updated_tx: Sender<i32>,
catalog_updated_tx: Sender<CatalogVersion>,
) -> Self {
let rx = client.subscribe(addr, WorkerType::Frontend).await.unwrap();
Self {
Expand Down Expand Up @@ -105,6 +104,7 @@ impl ObserverManager {
);
let db_name = database.get_database_name();
let db_id = database.get_database_ref_id()?.database_id as u64;
let version = database.get_version();

match operation {
Operation::Add => self
Expand All @@ -121,7 +121,7 @@ impl ObserverManager {
}

self.catalog_updated_tx
.send(UPDATE_FINISH_NOTIFICATION)
.send(version)
.map_err(|e| RwError::from(InternalError(e.to_string())))
}

Expand All @@ -142,6 +142,7 @@ impl ObserverManager {
.ok_or_else(|| CatalogError::NotFound("database id", db_id.to_string()))?;
let schema_name = schema.get_schema_name();
let schema_id = schema_ref_id.schema_id as SchemaId;
let version = schema.get_version();

match operation {
Operation::Add => self.catalog_cache.write().unwrap().create_schema(
Expand All @@ -162,7 +163,7 @@ impl ObserverManager {
}

self.catalog_updated_tx
.send(UPDATE_FINISH_NOTIFICATION)
.send(version)
.map_err(|e| RwError::from(InternalError(e.to_string())))
}

Expand Down Expand Up @@ -190,6 +191,7 @@ impl ObserverManager {
.unwrap()
.get_schema_name(schema_id)
.ok_or_else(|| CatalogError::NotFound("schema id", schema_id.to_string()))?;
let version = table.get_version();

match operation {
Operation::Add => {
Expand All @@ -211,7 +213,7 @@ impl ObserverManager {
}

self.catalog_updated_tx
.send(UPDATE_FINISH_NOTIFICATION)
.send(version)
.map_err(|e| RwError::from(InternalError(e.to_string())))
}
}
4 changes: 2 additions & 2 deletions rust/frontend/src/scheduler/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,14 @@ mod tests {

use super::WorkerNodeManager;
use crate::catalog::catalog_service::CatalogCache;
use crate::observer::observer_manager::{ObserverManager, UPDATE_FINISH_NOTIFICATION};
use crate::observer::observer_manager::ObserverManager;
use crate::test_utils::FrontendMockMetaClient;

#[tokio::test]
async fn test_add_and_delete_worker_node() {
let mut meta_client = MetaClient::mock(FrontendMockMetaClient::new().await);

let (catalog_updated_tx, _) = watch::channel(UPDATE_FINISH_NOTIFICATION);
let (catalog_updated_tx, _) = watch::channel(0);
let catalog_cache = Arc::new(RwLock::new(
CatalogCache::new(meta_client.clone()).await.unwrap(),
));
Expand Down
4 changes: 2 additions & 2 deletions rust/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::catalog::catalog_service::{
CatalogCache, CatalogConnector, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME,
};
use crate::handler::handle;
use crate::observer::observer_manager::{ObserverManager, UPDATE_FINISH_NOTIFICATION};
use crate::observer::observer_manager::ObserverManager;
use crate::scheduler::schedule::WorkerNodeManager;
use crate::FrontendOpts;
pub struct QueryContext<'a> {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl FrontendEnv {
// Register in meta by calling `AddWorkerNode` RPC.
meta_client.register(host, WorkerType::Frontend).await?;

let (catalog_updated_tx, catalog_updated_rx) = watch::channel(UPDATE_FINISH_NOTIFICATION);
let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
let catalog_cache = Arc::new(RwLock::new(CatalogCache::new(meta_client.clone()).await?));
let catalog_manager = CatalogConnector::new(
meta_client.clone(),
Expand Down
10 changes: 7 additions & 3 deletions rust/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ impl FrontendMockMetaClient {

let notification_manager = Arc::new(NotificationManager::new());
let catalog_manager = Arc::new(
StoredCatalogManager::new(meta_store.clone(), notification_manager.clone())
.await
.unwrap(),
StoredCatalogManager::new(
meta_store.clone(),
epoch_generator.clone(),
notification_manager.clone(),
)
.await
.unwrap(),
);

let cluster_manager = Arc::new(
Expand Down
Loading