diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index a6f65f6da32848..f31e4b3908ade4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -66,6 +66,7 @@ import org.apache.pulsar.broker.service.TopicAttributes; import org.apache.pulsar.broker.service.TopicPolicyListener; import org.apache.pulsar.broker.service.TransportCnx; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.api.MessageId; @@ -1235,11 +1236,15 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME) .toList().size()) .sum(); - if (hasSchema - || (!producers.isEmpty()) + boolean isActive = (!producers.isEmpty()) || (numActiveConsumersWithoutAutoSchema != 0) - || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) { + || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0; + if (hasSchema) { return checkSchemaCompatibleForConsumer(schema); + } else if (isActive) { + return FutureUtil.failedFuture(new IncompatibleSchemaException( + "Failed to add schema to an active topic with empty(BYTES) schema: new schema type " + + schema.getType())); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1983fa3c383e3c..407cc7256dd4c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -132,6 +132,7 @@ import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; @@ -4039,11 +4040,15 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME) .toList().size()) .sum(); - if (hasSchema - || (userCreatedProducerCount > 0) + boolean isActive = (userCreatedProducerCount > 0) || (numActiveConsumersWithoutAutoSchema != 0) - || (ledger.getTotalSize() != 0)) { + || (ledger.getTotalSize() != 0); + if (hasSchema) { return checkSchemaCompatibleForConsumer(schema); + } else if (isActive) { + return FutureUtil.failedFuture(new IncompatibleSchemaException( + "Failed to add schema to an active topic with empty(BYTES) schema: new schema type " + + schema.getType())); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d21e853ba09824..ae9ea6d5ae6f48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -96,6 +96,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -125,6 +126,11 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @DataProvider(name = "topicDomain") + public static Object[] topicDomain() { + return new Object[] { "persistent://", "non-persistent://" }; + } + @Test public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{ final String tenant = PUBLIC_TENANT; @@ -1336,19 +1342,19 @@ private void testIncompatibleSchema() throws Exception { * the new consumer to register new schema. But before we can solve this problem, we need to modify * "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES". */ - @Test - public void testAutoProduceAndSpecifiedConsumer() throws Exception { + @Test(dataProvider = "topicDomain") + public void testAutoProduceAndSpecifiedConsumer(String domain) throws Exception { final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16); admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME)); - final String topicName = "persistent://" + namespace + "/tp_" + randomName(16); + final String topicName = domain + namespace + "/tp_" + randomName(16); admin.topics().createNonPartitionedTopic(topicName); Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); try { pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe(); - fail("Should throw ex: Topic does not have schema to check"); + fail("Should throw ex: Failed to add schema to an active topic with empty(BYTES) schema"); } catch (Exception ex){ - assertTrue(ex.getMessage().contains("Topic does not have schema to check")); + assertTrue(ex.getMessage().contains("Failed to add schema to an active topic with empty(BYTES) schema")); } // Cleanup.