From db3524d0539c4bd4caca72dad6ed7f62e888136a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 28 Apr 2024 22:15:54 +0800 Subject: [PATCH 1/6] Do not dispatch txn aborted messages to reader. --- .../service/persistent/PersistentTopic.java | 9 ++++- .../broker/transaction/TransactionTest.java | 33 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 95a2b64908a73..3b01b29f6e0f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3647,7 +3647,14 @@ public CompletableFuture getLastDispatchablePosition() { return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer - return !Markers.isServerOnlyMarker(md); + if (Markers.isServerOnlyMarker(md)) { + return false; + } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) { + // Filter-out transaction aborted messages. + TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits()); + return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); + } + return true; }, maxReadPosition); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e45924e8bb4f2..1d09845640aec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1946,4 +1946,37 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception { + maxDeliveryDelayInMillis + " milliseconds"); } } + + @Test + public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/testPersistentTopicGetLastDispatchablePositionWithTxn"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create(); + + + // send a normal message + MessageIdImpl msgId = (MessageIdImpl) producer.send(UUID.randomUUID().toString()); + + // send 3 txn messages + Transaction txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); + + // abort the txn + txn.abort().get(); + + BrokerService brokerService = pulsarTestContexts.get(0).getBrokerService(); + PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopicReference(topic).get(); + + // get last dispatchable position + PositionImpl lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get(); + + // the last dispatchable position should be the message id of the normal message + assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); + } } From 5dcb744337112bca1daadf0d4730d8ad0cff46e2 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 28 Apr 2024 22:19:15 +0800 Subject: [PATCH 2/6] improve test --- .../broker/transaction/TransactionTest.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 1d09845640aec..4f644f93297d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -67,10 +67,7 @@ import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.Bytes; -import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.*; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -1959,7 +1956,8 @@ public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Excep // send a normal message - MessageIdImpl msgId = (MessageIdImpl) producer.send(UUID.randomUUID().toString()); + String body = UUID.randomUUID().toString(); + MessageIdImpl msgId = (MessageIdImpl) producer.send(body); // send 3 txn messages Transaction txn = pulsarClient.newTransaction().build().get(); @@ -1978,5 +1976,17 @@ public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Excep // the last dispatchable position should be the message id of the normal message assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); + + @Cleanup + Reader reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + List> messages = new ArrayList<>(); + while (reader.hasMessageAvailable()) { + messages.add(reader.readNext()); + } + assertEquals(messages.size(), 1); + assertEquals(messages.get(0).getValue(), body); } } From a1271621859d181bcdfa2d7806441e64fdf4c2df Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 29 Apr 2024 09:30:30 +0800 Subject: [PATCH 3/6] improve test --- .../broker/transaction/TransactionTest.java | 16 ++++++----- .../buffer/TopicTransactionBufferTest.java | 27 ++++++++++--------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 4f644f93297d8..3a0b5aaa4b684 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1954,6 +1954,9 @@ public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Excep .enableBatching(false) .create(); + BrokerService brokerService = pulsarTestContexts.get(0).getBrokerService(); + PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopicReference(topic).get(); + // send a normal message String body = UUID.randomUUID().toString(); @@ -1965,15 +1968,16 @@ public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Excep producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); - // abort the txn - txn.abort().get(); - - BrokerService brokerService = pulsarTestContexts.get(0).getBrokerService(); - PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopicReference(topic).get(); - // get last dispatchable position PositionImpl lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get(); + // the last dispatchable position should be the message id of the normal message + assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); + // abort the txn + txn.abort().get(5, TimeUnit.SECONDS); + + // get last dispatchable position + lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get(); // the last dispatchable position should be the message id of the normal message assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index b0903b00be380..f698a205ef46b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -280,9 +280,9 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { for (int i = 0; i < 3; i++) { expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); } - assertMessageId(consumer, expectedLastMessageID); + assertGetLastMessageId(consumer, expectedLastMessageID); // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. - // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5. Transaction txn1 = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() @@ -291,25 +291,28 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { .withTransactionTimeout(5, TimeUnit.HOURS) .build() .get(); + + // |1:0|1:1|1:2|txn1:1:3| producer.newMessage(txn1).send(); - // expectedLastMessageID1 == 1:4 + + // |1:0|1:1|1:2|txn1:1:3|1:4| MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); + + // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5| producer.newMessage(txn2).send(); - // expectedLastMessageID2 == 1:6 - MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send(); // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. - assertMessageId(consumer, expectedLastMessageID); + assertGetLastMessageId(consumer, expectedLastMessageID); // 2.2.2 Last message ID will update to 1:4 when txn1 committed. - // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7| + // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6| txn1.commit().get(5, TimeUnit.SECONDS); - assertMessageId(consumer, expectedLastMessageID1); + assertGetLastMessageId(consumer, expectedLastMessageID1); - // 2.2.3 Last message ID will update to 1:6 when txn2 aborted. - // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8| + // 2.2.3 Last message ID will still to 1:4 when txn2 aborted. + // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7| txn2.abort().get(5, TimeUnit.SECONDS); - assertMessageId(consumer, expectedLastMessageID2); + assertGetLastMessageId(consumer, expectedLastMessageID1); } /** @@ -368,7 +371,7 @@ private void triggerLedgerSwitch(String topicName) throws Exception{ }); } - private void assertMessageId(Consumer consumer, MessageIdImpl expected) throws Exception { + private void assertGetLastMessageId(Consumer consumer, MessageIdImpl expected) throws Exception { TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0); assertEquals(expected.getEntryId(), actual.getEntryId()); assertEquals(expected.getLedgerId(), actual.getLedgerId()); From 5f4d511b8aea3b78dab5f81049431d552be14d8c Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 29 Apr 2024 12:52:48 +0800 Subject: [PATCH 4/6] improve test & fix code --- .../service/persistent/PersistentTopic.java | 31 +++++++------------ .../broker/transaction/TransactionTest.java | 31 ++++++++++++------- .../buffer/TopicTransactionBufferTest.java | 9 ++++++ 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3b01b29f6e0f3..bec894b5b5e4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3638,25 +3638,18 @@ public Position getLastPosition() { @Override public CompletableFuture getLastDispatchablePosition() { - PositionImpl maxReadPosition = getMaxReadPosition(); - // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. - // so return `maxRedPosition` directly. - if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { - return CompletableFuture.completedFuture(maxReadPosition); - } else { - return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { - MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); - // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer - if (Markers.isServerOnlyMarker(md)) { - return false; - } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) { - // Filter-out transaction aborted messages. - TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits()); - return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); - } - return true; - }, maxReadPosition); - } + return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { + MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); + // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer + if (Markers.isServerOnlyMarker(md)) { + return false; + } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) { + // Filter-out transaction aborted messages. + TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits()); + return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); + } + return true; + }, getMaxReadPosition()); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 3a0b5aaa4b684..d2489c231f19e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -105,17 +105,7 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderBuilder; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.ClientCnx; @@ -1981,16 +1971,35 @@ public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Excep // the last dispatchable position should be the message id of the normal message assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); + @Cleanup Reader reader = pulsarClient.newReader(Schema.STRING) .topic(topic) .startMessageId(MessageId.earliest) .create(); + Transaction txn1 = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn1).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn1).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn1).value(UUID.randomUUID().toString()).send(); List> messages = new ArrayList<>(); while (reader.hasMessageAvailable()) { messages.add(reader.readNext()); } assertEquals(messages.size(), 1); assertEquals(messages.get(0).getValue(), body); + + txn1.abort().get(5, TimeUnit.SECONDS); + + @Cleanup + Reader reader1 = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + List> messages1 = new ArrayList<>(); + while (reader1.hasMessageAvailable()) { + messages1.add(reader1.readNext()); + } + assertEquals(messages1.size(), 1); + assertEquals(messages1.get(0).getValue(), body); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index f698a205ef46b..f93cfbcdc50f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -313,6 +313,15 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7| txn2.abort().get(5, TimeUnit.SECONDS); assertGetLastMessageId(consumer, expectedLastMessageID1); + + // Handle the case of the maxReadPosition < lastPosition, but it's an aborted transactional message. + Transaction txn3 = pulsarClient.newTransaction() + .build() + .get(); + producer.newMessage(txn3).send(); + assertGetLastMessageId(consumer, expectedLastMessageID1); + txn3.abort().get(5, TimeUnit.SECONDS); + assertGetLastMessageId(consumer, expectedLastMessageID1); } /** From 60f6ffa5de2ef48d54490ccca640bf3449e0230d Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 1 May 2024 10:54:19 +0800 Subject: [PATCH 5/6] Remove duplicate method call --- .../apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java index cd8671b0e6289..cd5d449c83350 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -63,8 +63,7 @@ public void readEntryComplete(Entry entry, Object ctx) { if (!ledger.isValidPosition(previousPosition)) { future.complete(previousPosition); } else { - internalAsyncReverseFindPositionOneByOne(ledger, predicate, - ledger.getPreviousPosition((PositionImpl) position), future); + internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); } } catch (Exception e) { future.completeExceptionally(e); From 5cbddb4f660e1747f2d9b08f4bf5761f154ce909 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 5 May 2024 12:16:19 +0800 Subject: [PATCH 6/6] address comment --- .../mledger/util/ManagedLedgerImplUtils.java | 16 ++++++---------- .../broker/transaction/TransactionTest.java | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java index cd5d449c83350..01de115290ab9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -38,11 +38,7 @@ public static CompletableFuture asyncGetLastValidPosition(final Manage final Predicate predicate, final PositionImpl startPosition) { CompletableFuture future = new CompletableFuture<>(); - if (!ledger.isValidPosition(startPosition)) { - future.complete(startPosition); - } else { - internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); - } + internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); return future; } @@ -50,6 +46,10 @@ private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedger final Predicate predicate, final PositionImpl position, final CompletableFuture future) { + if (!ledger.isValidPosition(position)) { + future.complete(position); + return; + } ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { @@ -60,11 +60,7 @@ public void readEntryComplete(Entry entry, Object ctx) { return; } PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); - if (!ledger.isValidPosition(previousPosition)) { - future.complete(previousPosition); - } else { - internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); - } + internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); } catch (Exception e) { future.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index f461d34a4b175..e8c15d193a22d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -68,7 +68,10 @@ import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.Bytes; -import org.apache.bookkeeper.mledger.*; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -109,7 +112,17 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.ClientCnx;