diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index df6b1b8a8f92f..57d709e9768c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -32,9 +33,12 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -130,6 +134,50 @@ public void testClientVersion() throws Exception { } @Test + public void testCnxReceiveSendError() throws Exception { + final String topicOne = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-one"; + final String topicTwo = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-two"; + + PulsarClient client = PulsarClient.builder().serviceUrl(lookupUrl.toString()).connectionsPerBroker(1).build(); + Producer producerOne = client.newProducer(Schema.STRING) + .topic(topicOne) + .create(); + Producer producerTwo = client.newProducer(Schema.STRING) + .topic(topicTwo) + .create(); + ClientCnx cnxOne = ((ProducerImpl) producerOne).getClientCnx(); + ClientCnx cnxTwo = ((ProducerImpl) producerTwo).getClientCnx(); + + // simulate a sending error + cnxOne.handleSendError(Commands.newSendErrorCommand(((ProducerImpl) producerOne).producerId, + 10, ServerError.PersistenceError, "persistent error").getSendError()); + + // two producer use the same cnx + Assert.assertEquals(cnxOne, cnxTwo); + + // the cnx will not change + try { + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + (((ProducerImpl) producerOne).getClientCnx() != null + && !cnxOne.equals(((ProducerImpl) producerOne).getClientCnx())) + || !cnxTwo.equals(((ProducerImpl) producerTwo).getClientCnx())); + Assert.fail(); + } catch (Throwable e) { + Assert.assertTrue(e instanceof ConditionTimeoutException); + } + + // two producer use the same cnx + Assert.assertEquals(((ProducerImpl) producerTwo).getClientCnx(), + ((ProducerImpl) producerOne).getClientCnx()); + + // producer also can send message + producerOne.send("test"); + producerTwo.send("test"); + producerTwo.close(); + producerOne.close(); + client.close(); + } + public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception { final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp"); admin.topics().createNonPartitionedTopic(topic); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 6f343a2ee5855..24163c631ffe9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -783,11 +783,9 @@ protected void handleSendError(CommandSendError sendError) { case NotAllowedError: producers.get(producerId).recoverNotAllowedError(sequenceId, sendError.getMessage()); break; - default: - // By default, for transient error, let the reconnection logic - // to take place and re-establish the produce again - ctx.close(); + // don't close this ctx, otherwise it will close all consumers and producers which use this ctx + producers.get(producerId).connectionClosed(this, Optional.empty(), Optional.empty()); } }