Skip to content

Commit

Permalink
[Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Dele…
Browse files Browse the repository at this point in the history
…tion (apache#15914)

* Truncate topic before deletion to avoid orphaned offloaded ledgers

* CR feedback
  • Loading branch information
dlg99 authored Sep 29, 2022
1 parent 3de690d commit 9026d19
Show file tree
Hide file tree
Showing 15 changed files with 666 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void delete(String name) throws InterruptedException, ManagedLedgerException;

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
* @param name
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
throws InterruptedException, ManagedLedgerException;

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
Expand All @@ -154,6 +164,16 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
* @param name
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx);

/**
* Releases all the resources maintained by the ManagedLedgerFactory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -71,13 +72,15 @@
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -802,12 +805,18 @@ public void operationFailed(MetaStoreException e) {

@Override
public void delete(String name) throws InterruptedException, ManagedLedgerException {
delete(name, CompletableFuture.completedFuture(null));
}

@Override
public void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException e = null;
}
final Result r = new Result();
final CountDownLatch latch = new CountDownLatch(1);
asyncDelete(name, new DeleteLedgerCallback() {
asyncDelete(name, mlConfigFuture, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
latch.countDown();
Expand All @@ -829,10 +838,16 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx) {
asyncDelete(name, CompletableFuture.completedFuture(null), callback, ctx);
}

@Override
public void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
CompletableFuture<ManagedLedgerImpl> future = ledgers.get(name);
if (future == null) {
// Managed ledger does not exist and we're not currently trying to open it
deleteManagedLedger(name, callback, ctx);
deleteManagedLedger(name, mlConfigFuture, callback, ctx);
} else {
future.thenAccept(ml -> {
// If it's open, delete in the normal way
Expand All @@ -847,7 +862,8 @@ public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx)
/**
* Delete all managed ledger resources and metadata.
*/
void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback callback, Object ctx) {
void deleteManagedLedger(String managedLedgerName, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
// Read the managed ledger metadata from store
asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() {
@Override
Expand All @@ -859,7 +875,7 @@ public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
.map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue()))
.collect(Collectors.toList());
Futures.waitForAll(futures).thenRun(() -> {
deleteManagedLedgerData(bkc, managedLedgerName, info, callback, ctx);
deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx);
}).exceptionally(ex -> {
callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx);
return null;
Expand All @@ -874,22 +890,80 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}

private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info,
DeleteLedgerCallback callback, Object ctx) {
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
final CompletableFuture<Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
ledgerInfosFuture = new CompletableFuture<>();
store.getManagedLedgerInfo(managedLedgerName, false, null,
new MetaStoreCallback<>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> infos = new HashMap<>();
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
infos.put(ls.getLedgerId(), ls);
}
ledgerInfosFuture.complete(infos);
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("Failed to get managed ledger info for {}", managedLedgerName, e);
ledgerInfosFuture.completeExceptionally(e);
}
});

Futures.waitForAll(info.ledgers.stream()
.filter(li -> !li.isOffloaded)
.map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
throw new CompletionException(ex);
.map(li -> {
final CompletableFuture<Void> res;
if (li.isOffloaded) {
res = mlConfigFuture
.thenCombine(ledgerInfosFuture, Pair::of)
.thenCompose(pair -> {
ManagedLedgerConfig mlConfig = pair.getLeft();
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfos = pair.getRight();

if (mlConfig == null || ledgerInfos == null) {
return CompletableFuture.completedFuture(null);
}

MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId);

if (ls.getOffloadContext().hasUidMsb()) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
String driverName = OffloadUtils.getOffloadDriverName(ls,
mlConfig.getLedgerOffloader().getOffloadDriverName());
Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata());
OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);

UUID uuid = new UUID(ls.getOffloadContext().getUidMsb(),
ls.getOffloadContext().getUidLsb());
return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig,
OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata()),
"Deletion", managedLedgerName, scheduledExecutor);
}
return result;
}))

return CompletableFuture.completedFuture(null);
});
} else {
res = CompletableFuture.completedFuture(null);
}
return res.thenCompose(__ -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
throw new CompletionException(ex);
}
return result;
}));
})
.collect(Collectors.toList()))
.thenRun(() -> {
// Delete the metadata
Expand Down
Loading

0 comments on commit 9026d19

Please sign in to comment.