Skip to content

Commit

Permalink
[fix][broker] Restore solution for addressing certain topic unloading…
Browse files Browse the repository at this point in the history
… race conditions

- solution was introduced in apache#17526
- however, it was accidentially replaced with a call to the incorrect
  method signature in apache#17736
  • Loading branch information
lhotari committed Jun 7, 2023
1 parent d7186a6 commit 3463fb9
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2227,10 +2227,6 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(String topicName) {
return removeTopicFutureFromCache(topicName, null);
}

public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
if (createTopicFuture.isEmpty()){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}

private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(topic)
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
// (3) remove topic and managed-ledger from broker which means topic is not closed gracefully
consumer.close();
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
pulsar.getBrokerService().removeTopicFromCache(topic);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testSkipCorruptDataLedger() throws Exception {

// clean managed-ledger and recreate topic to clean any data from the cache
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
pulsar.getBrokerService().removeTopicFromCache(topic);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down

0 comments on commit 3463fb9

Please sign in to comment.