Skip to content

Commit

Permalink
[improve][client] Add exception handle for client send error (#23038)
Browse files Browse the repository at this point in the history
### Motivation

- producer send messages return error will close the current cnx
- if close the current cnx, all producers and consumers witch use this cnx will close and reconnect
(https://github.com/apache/pulsar/blob/5c6602cbb3660a696bf960f2847aac1a2ae037d2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L323-L345)
- this will influence a lot of producers and consumers
- we only close this producer and reconnect enough, don't need to close this cnx


### Modifications

receive send_error, close current producer connection, then it will reconnect automatically
  • Loading branch information
congbobo184 authored Jul 23, 2024
1 parent 81aed6c commit fca9c5c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -130,6 +134,50 @@ public void testClientVersion() throws Exception {
}

@Test
public void testCnxReceiveSendError() throws Exception {
final String topicOne = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-one";
final String topicTwo = "persistent://" + NAMESPACE + "/testCnxReceiveSendError-two";

PulsarClient client = PulsarClient.builder().serviceUrl(lookupUrl.toString()).connectionsPerBroker(1).build();
Producer<String> producerOne = client.newProducer(Schema.STRING)
.topic(topicOne)
.create();
Producer<String> producerTwo = client.newProducer(Schema.STRING)
.topic(topicTwo)
.create();
ClientCnx cnxOne = ((ProducerImpl<?>) producerOne).getClientCnx();
ClientCnx cnxTwo = ((ProducerImpl<?>) producerTwo).getClientCnx();

// simulate a sending error
cnxOne.handleSendError(Commands.newSendErrorCommand(((ProducerImpl<?>) producerOne).producerId,
10, ServerError.PersistenceError, "persistent error").getSendError());

// two producer use the same cnx
Assert.assertEquals(cnxOne, cnxTwo);

// the cnx will not change
try {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
(((ProducerImpl<?>) producerOne).getClientCnx() != null
&& !cnxOne.equals(((ProducerImpl<?>) producerOne).getClientCnx()))
|| !cnxTwo.equals(((ProducerImpl<?>) producerTwo).getClientCnx()));
Assert.fail();
} catch (Throwable e) {
Assert.assertTrue(e instanceof ConditionTimeoutException);
}

// two producer use the same cnx
Assert.assertEquals(((ProducerImpl<?>) producerTwo).getClientCnx(),
((ProducerImpl<?>) producerOne).getClientCnx());

// producer also can send message
producerOne.send("test");
producerTwo.send("test");
producerTwo.close();
producerOne.close();
client.close();
}

public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception {
final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp");
admin.topics().createNonPartitionedTopic(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,9 @@ protected void handleSendError(CommandSendError sendError) {
case NotAllowedError:
producers.get(producerId).recoverNotAllowedError(sequenceId, sendError.getMessage());
break;

default:
// By default, for transient error, let the reconnection logic
// to take place and re-establish the produce again
ctx.close();
// don't close this ctx, otherwise it will close all consumers and producers which use this ctx
producers.get(producerId).connectionClosed(this, Optional.empty(), Optional.empty());
}
}

Expand Down

0 comments on commit fca9c5c

Please sign in to comment.