Skip to content

Commit

Permalink
[improve][broker] Improve exception for topic does not have schema to…
Browse files Browse the repository at this point in the history
… check
  • Loading branch information
shibd committed Jun 25, 2024
1 parent 1517e63 commit 59654ab
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.broker.service.TopicAttributes;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -1235,11 +1236,15 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
.toList().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
boolean isActive = (!producers.isEmpty())
|| (numActiveConsumersWithoutAutoSchema != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0;
if (hasSchema) {
return checkSchemaCompatibleForConsumer(schema);
} else if (isActive) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
"Failed to add schema to an active topic with empty(BYTES) schema: new schema type "
+ schema.getType()));
} else {
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
Expand Down Expand Up @@ -4039,11 +4040,15 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
.toList().size())
.sum();
if (hasSchema
|| (userCreatedProducerCount > 0)
boolean isActive = (userCreatedProducerCount > 0)
|| (numActiveConsumersWithoutAutoSchema != 0)
|| (ledger.getTotalSize() != 0)) {
|| (ledger.getTotalSize() != 0);
if (hasSchema) {
return checkSchemaCompatibleForConsumer(schema);
} else if (isActive) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
"Failed to add schema to an active topic with empty(BYTES) schema: new schema type "
+ schema.getType()));
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -125,6 +126,11 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[] { "persistent://", "non-persistent://" };
}

@Test
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
final String tenant = PUBLIC_TENANT;
Expand Down Expand Up @@ -1336,19 +1342,19 @@ private void testIncompatibleSchema() throws Exception {
* the new consumer to register new schema. But before we can solve this problem, we need to modify
* "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES".
*/
@Test
public void testAutoProduceAndSpecifiedConsumer() throws Exception {
@Test(dataProvider = "topicDomain")
public void testAutoProduceAndSpecifiedConsumer(String domain) throws Exception {
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
final String topicName = "persistent://" + namespace + "/tp_" + randomName(16);
final String topicName = domain + namespace + "/tp_" + randomName(16);
admin.topics().createNonPartitionedTopic(topicName);

Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
try {
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
fail("Should throw ex: Topic does not have schema to check");
fail("Should throw ex: Failed to add schema to an active topic with empty(BYTES) schema");
} catch (Exception ex){
assertTrue(ex.getMessage().contains("Topic does not have schema to check"));
assertTrue(ex.getMessage().contains("Failed to add schema to an active topic with empty(BYTES) schema"));
}

// Cleanup.
Expand Down

0 comments on commit 59654ab

Please sign in to comment.