Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-6576: Failed to create Consumer for the first time #822

Closed
sijie opened this issue Mar 20, 2020 · 0 comments
Closed

ISSUE-6576: Failed to create Consumer for the first time #822

sijie opened this issue Mar 20, 2020 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Mar 20, 2020

Original Issue: apache#6576


Describe the bug
I have 2 clusters, Beijing and Guangzhou IDC. I created a namespace and enabled geo-replication.
Under this namespace, I first successfully created a producer in Beijing IDC with topic name is enant-test/n-1/topic-1 and Schema.STRING, but when I create a consumer in Guangzhou with same schema, it keeps failing.
To Reproduce
Steps to reproduce the behavior:

  1. deploy Cluster A and B,then created a namespace tenant-test/n-1 and enabled geo-replication
  2. created the producer in ClusterA and send a message,topic name is enant-test/n-1/topic-1
  3. create the consumer in Cluster B with same topic
  4. See error org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
    Screenshots
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1628)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
        at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
        at com.kugou.fanxing.starduration.Application.main(Application.java:15)
Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Topic does not have schema to check
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:673)
        at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:97)
        at 

Desktop (please complete the following information):

  • OS:Pulsar 2.5.0

Additional context
I located the following source code based on the exception information

    @Override
    public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, SchemaData schemaData,
                                                              SchemaCompatibilityStrategy strategy) {
        return getSchema(schemaId).thenCompose(existingSchema -> {
            if (existingSchema != null && !existingSchema.schema.isDeleted()) {
                    if (strategy == SchemaCompatibilityStrategy.BACKWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD_TRANSITIVE ||
                            strategy == SchemaCompatibilityStrategy.FULL) {
                        return checkCompatibilityWithLatest(schemaId, schemaData, SchemaCompatibilityStrategy.BACKWARD);
                    } else {
                        return checkCompatibilityWithAll(schemaId, schemaData, strategy);
                    }
            } else {
                return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
            }
        });
    }

I checked path /schemas in zk according to the method getSchema and found that there is no schemaInfo for this topic.
I continued to read the source code and found the following code.
hasSchema || isActive() should be false,but ledger.getTotalSize() should also be 0,because I haven't sent any messages in cluster B.

    @Override
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
        return hasSchema()
            .thenCompose((hasSchema) -> {
                    if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
                        return checkSchemaCompatibleForConsumer(schema);
                    } else {
                        return addSchema(schema).thenCompose(schemaVersion ->
                                CompletableFuture.completedFuture(null));
                    }
                });
    }
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant