Skip to content

Commit

Permalink
[C++] Remove the flaky and meaningless tests (#15271)
Browse files Browse the repository at this point in the history
Fixes #13849
Fixes #14848

### Motivation

#11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
the case that some `sendAsync` calls that are invoked after `closeAsync`
is called in another thread must complete with `ResultAlreadyClosed`.
It's flaky because the synchronization between two threads is not
strict. This test uses `sendStartLatch` for the order of `sendAsync` and
`closeAsync`:

```
sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
```

However, it cannot guarantee the rest `sendAsync` calls happen after
`closeAsync` is called. If so, all `sendAsync` calls will complete with
`ResultOk`.

On the other hand, this test is meaningless because it requires strict
synchronization between two threads so there is no need to run
`sendAsync` and `closeAsync` in two threads.

The verification of this test is also wrong, see
#13849 (comment).
When `closeAsync` is called, the previous `sendAsync` calls might not
complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
not only those called after `closeAsync`.

In addition, this PR also tries to fix the flaky `testReferenceCount`,
which assumes too strictly.

### Modifications

- Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
- Only check the reference count is greater than 0 instead of equal to 1

(cherry picked from commit eeea9ca)
  • Loading branch information
BewareMyPower committed Jul 26, 2022
1 parent d046a6d commit 70afb39
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 85 deletions.
2 changes: 1 addition & 1 deletion pulsar-client-cpp/tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ TEST(ClientTest, testReferenceCount) {
LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());

readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
ASSERT_EQ(readerWeakPtr.use_count(), 1);
ASSERT_TRUE(readerWeakPtr.use_count() > 0);
LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
}

Expand Down
85 changes: 1 addition & 84 deletions pulsar-client-cpp/tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,89 +159,6 @@ TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) {
ASSERT_EQ(ResultOk, result);
}

TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
// run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called
// and that messages sent after closeAsync is invoked receive ResultAlreadyClosed.
for (int run = 0; run < 20; run++) {
LOG_INFO("Start of run " << run);
Client client(serviceUrl);
const std::string partitionedTopic =
"testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr));

int res = makePutRequest(
adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

ProducerConfiguration producerConfiguration;
producerConfiguration.setLazyStartPartitionedProducers(true);
producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
producerConfiguration.setBatchingEnabled(true);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer));

int sendCount = 100;
std::vector<Promise<Result, MessageId>> promises(sendCount);
Promise<bool, Result> promiseClose;

// only call closeAsync once at least 10 messages have been sent
Latch sendStartLatch(10);
Latch closeLatch(1);
int closedAt = 0;

std::thread t1([&]() {
for (int i = 0; i < sendCount; i++) {
sendStartLatch.countdown();
Message msg = MessageBuilder().setContent("test").build();

if (closeLatch.getCount() == 0 && closedAt == 0) {
closedAt = i;
LOG_INFO("closedAt set to " << closedAt)
}

producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promises[i]));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});

std::thread t2([&]() {
sendStartLatch.wait(std::chrono::milliseconds(1000));
LOG_INFO("Closing");
producer.closeAsync(WaitForCallback(promiseClose));
LOG_INFO("Close called");
closeLatch.countdown();
Result result;
promiseClose.getFuture().get(result);
ASSERT_EQ(ResultOk, result);
LOG_INFO("Closed");
});

t1.join();
t2.join();

// make sure that all messages after the moment when closeAsync was invoked
// return AlreadyClosed
for (int i = 0; i < sendCount; i++) {
LOG_DEBUG("Checking " << i)

// whether a message was sent successfully or not, it's callback
// must have been invoked
ASSERT_EQ(true, promises[i].isComplete());
MessageId mi;
Result res = promises[i].getFuture().get(mi);
LOG_DEBUG("Result is " << res);

// for the messages sent after closeAsync was invoked, they
// should all return ResultAlreadyClosed
if (i >= closedAt) {
ASSERT_EQ(ResultAlreadyClosed, res);
}
}

client.close();
LOG_INFO("End of run " << run);
}
}

TEST(ProducerTest, testBacklogQuotasExceeded) {
std::string ns = "public/test-backlog-quotas";
std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));
Expand Down Expand Up @@ -283,4 +200,4 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
ASSERT_EQ(ResultOk, client.createProducer(partition, producer));

client.close();
}
}

0 comments on commit 70afb39

Please sign in to comment.