-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Use version to synchronize catalog #749
Conversation
Codecov Report
@@ Coverage Diff @@
## main #749 +/- ##
============================================
- Coverage 72.42% 72.42% -0.01%
Complexity 2762 2762
============================================
Files 918 918
Lines 53289 53304 +15
Branches 1787 1787
============================================
+ Hits 38594 38604 +10
- Misses 13801 13806 +5
Partials 894 894
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM.
while *rx.borrow() < version { | ||
rx.changed() | ||
.await | ||
.map_err(|e| RwError::from(InternalError(e.to_string())))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may extract this to a separated function like wait_version
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. The frontend should keep a max committed version in memory and could be initialized when start and fetch from meta. So the meta should stores a global catalog max committed version. This will be helpful for catalog MVCC implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Add a struct CatalogVersionGenerator
in CatalogManagerCore
of meta.
rust/meta/src/manager/catalog.rs
Outdated
@@ -150,7 +160,7 @@ where | |||
table.insert(&*self.meta_store_ref).await?; | |||
core.add_table(table.clone()); | |||
|
|||
if let TableInfo::MaterializedView(mview_info) = table.get_info().unwrap() { | |||
if let TableInfo::MaterializedView(mview_info) = table.get_info()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The info
must exist and we should unwrap
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
So this PR is saying: Update local catalog cache iff the version from meta is larger than local? |
Yes. When |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least better than check database by name.
Not dive into detail. LGTM.
.get_schema(db_name, schema_name) | ||
.is_none() | ||
{ | ||
while *rx.borrow() < version { | ||
rx.changed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the version gets changed between *rx.borrow() < version
and rx.changed()
, and the program gets stuck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no problem. My fault.
@@ -90,7 +90,7 @@ impl NotificationManager { | |||
})) | |||
.await | |||
.into_iter() | |||
.collect::<RwResult<()>>() | |||
.collect::<Result<()>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try_collect?
.get_database(db_name) | ||
.is_some() | ||
{ | ||
while *rx.borrow() < version { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we are using this in a wrong way...
This method does not mark the returned value as seen, so future calls to changed may return immediately even if you have already seen the value with a call to borrow.
should call borrow_and_update instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise .changed
will always return immediately, and we are spinning on this thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the version is not latest, the current approach may cause the condition being checked twice. e.g., rx.borrow() returns 1, but as we didn't call changed
, the following changed
will immediately return, and we will get rx.borrow() == 1
again. But this time, 1
is marked seen, so there won't be problem, and we will wait for the next update.
I'd recommend using borrow_and_update
for this scenario, but there won't be problem if we don't modify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the version is not latest, the current approach may cause the condition being checked twice. e.g., rx.borrow() returns 1, but as we didn't call
changed
, the followingchanged
will immediately return, and we will getrx.borrow() == 1
again. But this time,1
is marked seen, so there won't be problem.I'd recommend using
borrow_and_update
for this scenario, but there won't be problem if we don't modify this.
Yes, you are right. I will change it.
What's changed and what's your intention?
Use version to synchronize catalog.
There is a
version
in return value ofCreate
andDrop
RPC. Meta will send a notification with a biggerversion
.create/drop
inCatalogConnector
will wait until the value in message fromObserverManager
is bigger than the return value ofCreate
andDrop
RPC.Checklist
Refer to a related PR or issue link (optional)
related #567 #361