-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][pulsar-io-mongo] Add support for full message synchronization #16003
Conversation
pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
Show resolved
Hide resolved
stream.batchSize(mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP); | ||
stream.batchSize(mongoSourceConfig.getBatchSize()) | ||
.fullDocument(FullDocument.UPDATE_LOOKUP); | ||
|
||
if (SyncType.FULL_SYNC.equals(mongoSourceConfig.getSyncType())) { | ||
// sync currently existing messages | ||
// startAtOperationTime is the starting point for the change stream | ||
// setting startAtOperationTime to 0 means the start point is the earliest | ||
// see https://www.mongodb.com/docs/v4.2/reference/method/db.collection.watch/ for more information | ||
stream.startAtOperationTime(new BsonTimestamp(0L)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the key.
pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
Outdated
Show resolved
Hide resolved
I will finish UT later. Convert this PR to a draft temporarily. |
@315157973 PTAL 🎉 |
pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
Outdated
Show resolved
Hide resolved
pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
Outdated
Show resolved
Hide resolved
pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java
Outdated
Show resolved
Hide resolved
Please add some doc and make sure that the change passes the CI checks. |
@315157973 OK. And where should I add the doc to? |
The pr had no activity for 30 days, mark with Stale label. |
/pulsarbot run-failure-checks |
@codelipenghui Thank you very much! But why are there three failed checks after a rerun, and before there were four. And I didn't make changes to broker. |
@codelipenghui Hi, it seems that all the failed checks are flask tests, what should I do? Thank you! |
/pulsar-bot rerun-failure-checks |
/pulsarbot rerun-failure-checks |
/pulsarbot rerun-failure-checks |
Hi @shink
this is a flaky test, It should be fine if it is run several times
I read the log and found that instability was caused by Many tests in this group( |
@poorbarcode Thank you so much for your help! |
/pulsarbot rerun-failure-checks |
1 similar comment
/pulsarbot rerun-failure-checks |
@shink seems you've already included docs in this PR, so I will update the |
Motivation
Now, the MongoDB source connector only supports the incremental message synchronization.
This PR adds support for full message synchronization.
Since MongDB 4.0, we can set the starting point for the change stream by the
startAtOperationTime
field.So, we can set it to
0
to make start point the earliest.See https://www.mongodb.com/docs/v4.2/reference/method/db.collection.watch/ for more information.
Modifications
Improve config object.
There are some commonalities and differences between sink configuration and source configuration.
So, I created an abstract class called
MongoAbstractConfig
which contains the commonalities between them.MongoSourceConfig
andMongoSinkConfig
contain the unique configuration.Add support for full message synchronization in the source connector.
Verifying this change
Does this pull request potentially affect one of the following parts:
No.
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)