Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-88] Replicate schemas across clusters #11441

Merged
merged 4 commits into from
Aug 8, 2021

Conversation

codelipenghui
Copy link
Contributor

Motivation

Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the replicator.

Verifying this change

Added new tests to verify the consumer can consume messages with schema from the remote cluster.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the replicator.
@@ -64,6 +64,7 @@
private ByteBuf payload;

private Schema<T> schema;
private SchemaInfo schemaInfo;
Copy link
Contributor

@315157973 315157973 Jul 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused about the modification.
We can get schemaInfo from schema now.
Can we use public static Schema<?> getSchema(SchemaInfo schemaInfo) in AutoConsumeSchema to convert schemaInfo to Schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the schema replication, we don't need to convert schemaInfo to schema for the local topic and then convert the schema to the schemaInfo for the remote topic. Or we only need to pass the schemaInfo to the message which need to replicate to the remote cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a schema, but we added a schemaInfo, which will make other developers very confused. If we can't reuse existing attributes, we can at least do this:

  1. Make the new properties easier to understand, such as: naming modification, or adding comments, so that other developers can know schemaInfo only for replicator
  2. Independent get/set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @315157973 concern.
We already have "getReaderSchema", why cannot we use that ?

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change looks good, apart the fact that we are adding that SchemaInfo field from MessageImpl

@@ -64,6 +64,7 @@
private ByteBuf payload;

private Schema<T> schema;
private SchemaInfo schemaInfo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @315157973 concern.
We already have "getReaderSchema", why cannot we use that ?

@@ -418,14 +419,24 @@ private void ensureSchemaIsLoaded() {
}
}

private SchemaInfo getSchemaInfo() {
public SchemaInfo getSchemaInfo() {
if (schemaInfo != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite confusing, we should use what the internal Schema is reporting

@codelipenghui
Copy link
Contributor Author

@315157973 @eolivelli I have addressed your comments, PTAL.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

Great work

@eolivelli eolivelli merged commit a09bd68 into apache:master Aug 8, 2021
@codelipenghui codelipenghui deleted the penghui/pip-88 branch August 8, 2021 10:55
LeBW pushed a commit to LeBW/pulsar that referenced this pull request Aug 9, 2021
* [PIP-88] Replicate schemas accross clusters

Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
* [PIP-88] Replicate schemas accross clusters

Here is the proposal: https://github.com/apache/pulsar/wiki/PIP-88%3A-Replicate-schemas-across-multiple
For the implementation, we just need to set the correct SchemaInfo for the replicated message and using
the AutoProduceByte schema for the producer of the
@codelipenghui codelipenghui restored the penghui/pip-88 branch May 17, 2022 01:21
@codelipenghui codelipenghui deleted the penghui/pip-88 branch May 17, 2022 01:30
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Aug 10, 2022
### Motivation

apache#11441 supports replicate schema to remote clusters.
But there is a mistake that the returned schema state is incorrect.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770

Because the replicator used MessageImpl will not have the schema.
And this will cause the producer to skip the schema upload.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149

We should remove

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768

To return the correct schema state.

And then we should also provide the correct schema hash.
If the message is used by the replicator, the schema hash should
be based on the replicator schema. Otherwise, it should use based
on the schema of the message.

### Modification

- Fixed the incorrect returned schema state
- Provide the method for getting schema hash for MessageImpl

### Verification

Update the test only to create producer to one cluster.
Because if create a producer for other clusters, the producer will
upload the schema. This is the reason that why the test can get
pass before.
codelipenghui added a commit that referenced this pull request Aug 18, 2022
### Motivation

#11441 supports replicate schema to remote clusters.
But there is a mistake that the returned schema state is incorrect.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770

Because the replicator used MessageImpl will not have the schema.
And this will cause the producer to skip the schema upload.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149

We should remove

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768

To return the correct schema state.

And then we should also provide the correct schema hash.
If the message is used by the replicator, the schema hash should
be based on the replicator schema. Otherwise, it should use based
on the schema of the message.

### Modification

- Fixed the incorrect returned schema state
- Provide the method for getting schema hash for MessageImpl
Technoboy- pushed a commit that referenced this pull request Aug 18, 2022
### Motivation

#11441 supports replicate schema to remote clusters.
But there is a mistake that the returned schema state is incorrect.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770

Because the replicator used MessageImpl will not have the schema.
And this will cause the producer to skip the schema upload.

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149

We should remove

https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768

To return the correct schema state.

And then we should also provide the correct schema hash.
If the message is used by the replicator, the schema hash should
be based on the replicator schema. Otherwise, it should use based
on the schema of the message.

### Modification

- Fixed the incorrect returned schema state
- Provide the method for getting schema hash for MessageImpl
@BewareMyPower BewareMyPower mentioned this pull request Jun 13, 2023
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants