Skip to content

Commit

Permalink
make MLPendingAckStoreTest work correct, because broker limit max bat…
Browse files Browse the repository at this point in the history
…ch record must large than 9
  • Loading branch information
poorbarcode committed Sep 10, 2022
1 parent 00d08d7 commit 38fd979
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.mockito.stubbing.Answer;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -64,17 +66,21 @@ public class MLPendingAckStoreTest extends TransactionTestBase {

private int pendingAckLogIndexMinLag = 1;

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
setUpBase(1, 1, NAMESPACE1 + "/test", 0);
}

@BeforeMethod
public void beforeMethod() throws Exception {
String topic = NAMESPACE1 + "/test-txn-topic";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(pendingAckLogIndexMinLag);
CompletableFuture<Subscription> subscriptionFuture = persistentTopic .createSubscription("test",
CommandSubscribe.InitialPosition.Earliest, false, null);
CommandSubscribe.InitialPosition.Earliest, false, null);
PersistentSubscription subscription = (PersistentSubscription) subscriptionFuture.get();
ManagedCursor managedCursor = subscription.getCursor();
this.managedCursorMock = spy(managedCursor);
Expand All @@ -88,7 +94,12 @@ protected void setup() throws Exception {
.getExecutor(this);
}

@AfterMethod
@AfterMethod(alwaysRun = true)
private void afterMethod() throws Exception {
admin.topics().delete("persistent://" + NAMESPACE1 + "/test-txn-topic", true);
}

@AfterClass
public void cleanup(){
super.internalCleanup();
}
Expand Down

0 comments on commit 38fd979

Please sign in to comment.