Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][test] Improve TransactionEndToEndTest to reduce the execution time #17790

Merged
merged 2 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -96,14 +96,21 @@ public class TransactionEndToEndTest extends TransactionTestBase {
protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
protected static final int NUM_PARTITIONS = 16;
@BeforeMethod
@BeforeClass
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}

@AfterMethod(alwaysRun = true)
protected void resetTopicOutput() throws Exception {
admin.topics().deletePartitionedTopic(TOPIC_OUTPUT, true);
admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().deletePartitionedTopic(TOPIC_MESSAGE_ACK_TEST, true);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}

@AfterClass(alwaysRun = true)
protected void cleanup() {
super.internalCleanup();
}
Expand Down Expand Up @@ -167,6 +174,10 @@ private void produceCommitTest(boolean enableBatch) throws Exception {
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);

// cleanup.
producer.close();
consumer.close();
resetTopicOutput();
log.info("message commit test enableBatch {}", enableBatch);
}

Expand All @@ -175,7 +186,6 @@ public void produceAbortTest() throws Exception {
Transaction txn = getTxn();
String subName = "test";

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(TOPIC_OUTPUT)
Expand All @@ -188,7 +198,6 @@ public void produceAbortTest() throws Exception {
producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
}

@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(TOPIC_OUTPUT)
Expand Down Expand Up @@ -253,6 +262,10 @@ public void produceAbortTest() throws Exception {
return flag;
});

// cleanup.
producer.close();
consumer.close();
resetTopicOutput();
log.info("finished test partitionAbortTest");
}

Expand Down Expand Up @@ -311,6 +324,11 @@ private void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch)
}
}
assertTrue(flag);

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topicName, true);
}

@Test
Expand Down Expand Up @@ -406,6 +424,11 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize,
Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
}
}

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(normalTopic, true);
}

@Test
Expand All @@ -423,6 +446,11 @@ public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topicTwo, true);
}

@Test
Expand Down Expand Up @@ -536,6 +564,10 @@ public void txnMessageAckTest() throws Exception {
}
assertTrue(exist);

// cleanup.
producer.close();
consumer.close();
resetTopicOutput();
log.info("receive transaction messages count: {}", receiveCnt);
}

Expand Down Expand Up @@ -638,6 +670,11 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri
message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNull(message);
}

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(normalTopic, true);
}

private Transaction getTxn() throws Exception {
Expand All @@ -648,25 +685,6 @@ private Transaction getTxn() throws Exception {
.get();
}

private void markDeletePositionCheck(String topic, String subName, boolean equalsWithLastConfirm) throws Exception {
for (int i = 0; i < TOPIC_PARTITION; i++) {
PersistentTopicInternalStats stats = null;
String checkTopic = TopicName.get(topic).getPartition(i).toString();
for (int j = 0; j < 10; j++) {
stats = admin.topics().getInternalStats(checkTopic, false);
if (stats.lastConfirmedEntry.equals(stats.cursors.get(subName).markDeletePosition)) {
break;
}
Thread.sleep(200);
}
if (equalsWithLastConfirm) {
Assert.assertEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
} else {
Assert.assertNotEquals(stats.cursors.get(subName).markDeletePosition, stats.lastConfirmedEntry);
}
}
}

@Test
public void txnMetadataHandlerRecoverTest() throws Exception {
String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
Expand Down Expand Up @@ -714,6 +732,12 @@ public void txnMetadataHandlerRecoverTest() throws Exception {
Message<byte[]> message = consumer.receive();
Assert.assertNotNull(message);
}

// cleanup.
producer.close();
consumer.close();
recoverPulsarClient.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -748,9 +772,14 @@ public void produceTxnMessageOrderTest() throws Exception {
for (int i = 0; i < 1000; i++) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
Assert.assertEquals(Integer.valueOf(new String(message.getData())), Integer.valueOf(i));
}
}

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -855,10 +884,20 @@ public void produceAndConsumeCloseStateTxnTest() throws Exception {
field.setAccessible(true);
TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
assertEquals(state, TransactionImpl.State.ERROR);

// cleanup.
timeoutTxn.abort();
producer.close();
consumer.close();
admin.topics().delete(topic, true);
}

@Test
public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
Collection<TransactionMetadataStore> transactionMetadataStores =
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
long timeoutCountOriginal = transactionMetadataStores.stream()
.mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(0), 1).get();
Awaitility.await().until(() -> {
Expand All @@ -869,11 +908,9 @@ public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
return true;
}
});
Collection<TransactionMetadataStore> transactionMetadataStores =
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
long timeoutCount = transactionMetadataStores.stream()
.mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
Assert.assertEquals(timeoutCount, 1);
Assert.assertEquals(timeoutCount, timeoutCountOriginal + 1);
}

@Test
Expand Down Expand Up @@ -914,6 +951,11 @@ public void transactionTimeoutTest() throws Exception {

assertEquals(reReceiveMessage.getMessageId(), message.getMessageId());

// cleanup.
consumeTimeoutTxn.abort();
producer.close();
consumer.close();
admin.topics().delete(topic, true);
}

@DataProvider(name = "ackType")
Expand Down Expand Up @@ -979,6 +1021,11 @@ public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) th
}
txn.abort().get();
assertTrue(exist);

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1036,6 +1083,13 @@ public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
}
}
assertTrue(flag);

// cleanup.
txn.abort().get();
producer.close();
consumer1.close();
consumer2.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1070,6 +1124,11 @@ public void testTxnTimeOutInClient() throws Exception{
Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
.InvalidTxnStatusException);
}

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1138,6 +1197,11 @@ public void testCumulativeAckRedeliverMessages() throws Exception {
// then redeliver will not receive any message
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1172,6 +1236,11 @@ public void testSendTxnMessageTimeout() throws Exception {
} catch (PulsarClientException ex) {
assertTrue(ex instanceof PulsarClientException.TimeoutException);
}

// cleanup.
transaction.abort().get();
producer.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1218,6 +1287,12 @@ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exce
// ack one message, the unack count is 4
assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);

// cleanup.
txn.abort().get();
consumer.close();
producer.close();
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1274,6 +1349,14 @@ public void testSendTxnAckMessageToDLQ() throws Exception {
assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);

assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));

// cleanup.
consumer.close();
deadLetterConsumer.close();
producer.close();
admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
topic, subName), true);
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1341,6 +1424,14 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception {

assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));

// cleanup.
consumer.close();
deadLetterConsumer.close();
producer.close();
admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
topic, subName), true);
admin.topics().delete(topic, true);
}

@Test
Expand Down Expand Up @@ -1400,5 +1491,11 @@ public void testDelayedTransactionMessages() throws Exception {
for (int i = 0; i < 10; i++) {
assertTrue(receivedMsgs.contains("msg-" + i));
}

// cleanup.
sharedConsumer.close();
failoverConsumer.close();
producer.close();
admin.topics().delete(topic, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
Expand All @@ -30,7 +30,7 @@
@Test(groups = "flaky")
public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest {

@BeforeMethod
@BeforeClass
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
Expand Down