Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to create Consumer for the first time #6576

Closed
zyllt opened this issue Mar 20, 2020 · 7 comments
Closed

Failed to create Consumer for the first time #6576

zyllt opened this issue Mar 20, 2020 · 7 comments
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@zyllt
Copy link

zyllt commented Mar 20, 2020

Describe the bug
I have 2 clusters, Beijing and Guangzhou IDC. I created a namespace and enabled geo-replication.
Under this namespace, I first successfully created a producer in Beijing IDC with topic name is enant-test/n-1/topic-1 and Schema.STRING, but when I create a consumer in Guangzhou with same schema, it keeps failing.
To Reproduce
Steps to reproduce the behavior:

  1. deploy Cluster A and B,then created a namespace tenant-test/n-1 and enabled geo-replication
  2. created the producer in ClusterA and send a message,topic name is enant-test/n-1/topic-1
  3. create the consumer in Cluster B with same topic
  4. See error org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
    Screenshots
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
        at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
        at com.kugou.fanxing.starduration.Application.main(Application.java:15)
Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:673)
        at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:97)
        at 

Desktop (please complete the following information):

  • OS:Pulsar 2.5.0

Additional context
I located the following source code based on the exception information

    @Override
    public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, SchemaData schemaData,
                                                              SchemaCompatibilityStrategy strategy) {
        return getSchema(schemaId).thenCompose(existingSchema -> {
            if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                    if (strategy == SchemaCompatibilityStrategy.BACKWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD_TRANSITIVE ||
                            strategy == SchemaCompatibilityStrategy.FULL) {
                        return checkCompatibilityWithLatest(schemaId, schemaData, SchemaCompatibilityStrategy.BACKWARD);
                    } else {
                        return checkCompatibilityWithAll(schemaId, schemaData, strategy);
                    }
            } else {
                return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
            }
        });
    }

I checked path /schemas in zk according to the method getSchema and found that there is no schemaInfo for this topic.
I continued to read the source code and found the following code.
hasSchema || isActive() should be false,but ledger.getTotalSize() should also be 0,because I haven't sent any messages in cluster B.

    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return hasSchema()
            .thenCompose((hasSchema) -> {
                    if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
                        return checkSchemaCompatibleForConsumer(schema);
                    } else {
                        return addSchema(schema).thenCompose(schemaVersion ->
                                CompletableFuture.completedFuture(null));
                    }
                });
    }
@codelipenghui
Copy link
Contributor

codelipenghui commented Mar 21, 2020

@zyllt Thanks for your feedback. I think the reason is the isActive() method. The replicator also creates a subscription on the topic, so isActive() returns true. I think we don't need to check if there are subscriptions on the topic since users can use pulsar-admin to create subscriptions and geo-replication also creates subscription on the topic.

@zplinuxlover
Copy link
Contributor

I will fix this issue

@zyllt
Copy link
Author

zyllt commented Mar 21, 2020

@codelipenghui Thanks for your reply. I originally thought the reason is the ledger.getTotalSize() by geo-replication, through your reply I thought it was indeed isActive() problem.I will actually verify it.

@zplinuxlover
Copy link
Contributor

the message has replicate to remote cluster, so ledger.getTotalSize() != 0 is true

@codelipenghui
Copy link
Contributor

@zyllt @zplinuxlover We need a PIP for fixing this issue. Currently, Pulsar does not replicate schema between Pulsar clusters. The schema info is stored in the local zookeeper and bookkeeper, so the new proposal is for replicating schemas between different clusters.

@codelipenghui codelipenghui modified the milestones: 2.6.0, 2.7.0 May 19, 2020
@codelipenghui codelipenghui modified the milestones: 2.7.0, 2.8.0 Nov 4, 2020
@codelipenghui codelipenghui modified the milestones: 2.8.0, 2.9.0 May 21, 2021
@eolivelli eolivelli modified the milestones: 2.9.0, 2.10.0 Oct 6, 2021
@JasonMing
Copy link

Any exciting news?

@codelipenghui
Copy link
Contributor

Fixed by #11441

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

6 participants