diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index e42c2581ba101..21841544f8102 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -145,6 +145,16 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M */ void delete(String name) throws InterruptedException, ManagedLedgerException; + /** + * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted. + * + * @param name + * @throws InterruptedException + * @throws ManagedLedgerException + */ + void delete(String name, CompletableFuture mlConfigFuture) + throws InterruptedException, ManagedLedgerException; + /** * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted. * @@ -154,6 +164,16 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M */ void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx); + /** + * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted. + * + * @param name + * @throws InterruptedException + * @throws ManagedLedgerException + */ + void asyncDelete(String name, CompletableFuture mlConfigFuture, + DeleteLedgerCallback callback, Object ctx); + /** * Releases all the resources maintained by the ManagedLedgerFactory. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index d7596a7468a40..e4bc53de52889 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -26,6 +26,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -71,6 +72,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; +import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -78,6 +80,7 @@ import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -802,12 +805,18 @@ public void operationFailed(MetaStoreException e) { @Override public void delete(String name) throws InterruptedException, ManagedLedgerException { + delete(name, CompletableFuture.completedFuture(null)); + } + + @Override + public void delete(String name, CompletableFuture mlConfigFuture) + throws InterruptedException, ManagedLedgerException { class Result { ManagedLedgerException e = null; } final Result r = new Result(); final CountDownLatch latch = new CountDownLatch(1); - asyncDelete(name, new DeleteLedgerCallback() { + asyncDelete(name, mlConfigFuture, new DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { latch.countDown(); @@ -829,10 +838,16 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { @Override public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx) { + asyncDelete(name, CompletableFuture.completedFuture(null), callback, ctx); + } + + @Override + public void asyncDelete(String name, CompletableFuture mlConfigFuture, + DeleteLedgerCallback callback, Object ctx) { CompletableFuture future = ledgers.get(name); if (future == null) { // Managed ledger does not exist and we're not currently trying to open it - deleteManagedLedger(name, callback, ctx); + deleteManagedLedger(name, mlConfigFuture, callback, ctx); } else { future.thenAccept(ml -> { // If it's open, delete in the normal way @@ -847,7 +862,8 @@ public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx) /** * Delete all managed ledger resources and metadata. */ - void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback callback, Object ctx) { + void deleteManagedLedger(String managedLedgerName, CompletableFuture mlConfigFuture, + DeleteLedgerCallback callback, Object ctx) { // Read the managed ledger metadata from store asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() { @Override @@ -859,7 +875,7 @@ public void getInfoComplete(ManagedLedgerInfo info, Object ctx) { .map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue())) .collect(Collectors.toList()); Futures.waitForAll(futures).thenRun(() -> { - deleteManagedLedgerData(bkc, managedLedgerName, info, callback, ctx); + deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx); }).exceptionally(ex -> { callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx); return null; @@ -874,22 +890,80 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { } private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info, - DeleteLedgerCallback callback, Object ctx) { + CompletableFuture mlConfigFuture, + DeleteLedgerCallback callback, Object ctx) { + final CompletableFuture> + ledgerInfosFuture = new CompletableFuture<>(); + store.getManagedLedgerInfo(managedLedgerName, false, null, + new MetaStoreCallback<>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + Map infos = new HashMap<>(); + for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) { + infos.put(ls.getLedgerId(), ls); + } + ledgerInfosFuture.complete(infos); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("Failed to get managed ledger info for {}", managedLedgerName, e); + ledgerInfosFuture.completeExceptionally(e); + } + }); + Futures.waitForAll(info.ledgers.stream() - .filter(li -> !li.isOffloaded) - .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute() - .handle((result, ex) -> { - if (ex != null) { - int rc = BKException.getExceptionCode(ex); - if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException - || rc == BKException.Code.NoSuchLedgerExistsException) { - log.info("Ledger {} does not exist, ignoring", li.ledgerId); - return null; - } - throw new CompletionException(ex); + .map(li -> { + final CompletableFuture res; + if (li.isOffloaded) { + res = mlConfigFuture + .thenCombine(ledgerInfosFuture, Pair::of) + .thenCompose(pair -> { + ManagedLedgerConfig mlConfig = pair.getLeft(); + Map ledgerInfos = pair.getRight(); + + if (mlConfig == null || ledgerInfos == null) { + return CompletableFuture.completedFuture(null); + } + + MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId); + + if (ls.getOffloadContext().hasUidMsb()) { + MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); + newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); + String driverName = OffloadUtils.getOffloadDriverName(ls, + mlConfig.getLedgerOffloader().getOffloadDriverName()); + Map driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls, + mlConfig.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata); + + UUID uuid = new UUID(ls.getOffloadContext().getUidMsb(), + ls.getOffloadContext().getUidLsb()); + return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig, + OffloadUtils.getOffloadDriverMetadata(ls, + mlConfig.getLedgerOffloader().getOffloadDriverMetadata()), + "Deletion", managedLedgerName, scheduledExecutor); } - return result; - })) + + return CompletableFuture.completedFuture(null); + }); + } else { + res = CompletableFuture.completedFuture(null); + } + return res.thenCompose(__ -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute() + .handle((result, ex) -> { + if (ex != null) { + int rc = BKException.getExceptionCode(ex); + if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException + || rc == BKException.Code.NoSuchLedgerExistsException) { + log.info("Ledger {} does not exist, ignoring", li.ledgerId); + return null; + } + throw new CompletionException(ex); + } + return result; + })); + }) .collect(Collectors.toList())) .thenRun(() -> { // Delete the metadata diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 254ee767bc7fc..a4f94b2a9b71a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2450,7 +2450,7 @@ void internalTrimConsumedLedgers(CompletableFuture promise) { void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { if (!factory.isMetadataServiceAvailable()) { // Defer trimming of ledger if we cannot connect to metadata service - promise.complete(null); + promise.completeExceptionally(new MetaStoreException("Metadata service is not available")); return; } @@ -2722,11 +2722,30 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) { @Override public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) { + // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and // ledgers setFenced(); cancelScheduledTasks(); + // Truncate to ensure the offloaded data is not orphaned. + // Also ensures the BK ledgers are deleted and not just scheduled for deletion + CompletableFuture truncateFuture = asyncTruncate(); + truncateFuture.whenComplete((ignore, exc) -> { + if (exc != null) { + log.error("[{}] Error truncating ledger for deletion", name, exc); + callback.deleteLedgerFailed(exc instanceof ManagedLedgerException + ? (ManagedLedgerException) exc : new ManagedLedgerException(exc), + ctx); + } else { + asyncDeleteInternal(callback, ctx); + } + }); + + } + + private void asyncDeleteInternal(final DeleteLedgerCallback callback, final Object ctx) { + List cursors = Lists.newArrayList(this.cursors); if (cursors.isEmpty()) { // No cursors to delete, proceed with next step @@ -2784,10 +2803,9 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - cleanupOffloaded(ledgerId, uuid, - OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Trimming"); + "Trimming", name, scheduledExecutor); } } @@ -2842,7 +2860,7 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { default: // Handle error log.warn("[{}] Failed to delete ledger {} -- {}", name, ls.getLedgerId(), - BKException.getMessage(rc)); + BKException.getMessage(rc) + " code " + rc); int toDelete = ledgersToDelete.get(); if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) { // Trigger callback only once @@ -3031,18 +3049,17 @@ private void offloadLoop(CompletableFuture promise, Queue prepareLedgerInfoForOffloaded(long ledgerId, UUI oldInfo.getOffloadContext().getUidLsb()); log.info("[{}] Found previous offload attempt for ledger {}, uuid {}" + ", cleaning up", name, ledgerId, uuid); - cleanupOffloaded( + OffloadUtils.cleanupOffloaded( ledgerId, oldUuid, - OffloadUtils.getOffloadDriverName(oldInfo, - config.getLedgerOffloader().getOffloadDriverName()), + config, OffloadUtils.getOffloadDriverMetadata(oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Previous failed offload"); + "Previous failed offload", + name, + scheduledExecutor); } LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() @@ -3230,28 +3248,6 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU }); } - private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName, /* - * TODO: use driver name to - * identify offloader - */ - Map offloadDriverMetadata, String cleanupReason) { - log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.", - name, ledgerId, uuid.toString(), cleanupReason); - Map metadataMap = new HashMap(); - metadataMap.putAll(offloadDriverMetadata); - metadataMap.put("ManagedLedgerName", name); - - Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), - Retries.NonFatalPredicate, - () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap), - scheduledExecutor, name).whenComplete((ignored, exception) -> { - if (exception != null) { - log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", - name, ledgerId, cleanupReason, exception); - } - }); - } - /** * Get the number of entries between a contiguous range of two positions. * @@ -3760,7 +3756,7 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod } else if (isBkErrorNotRecoverable(bkErrorCode)) { return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode)); } else { - return new ManagedLedgerException(BKException.getMessage(bkErrorCode)); + return new ManagedLedgerException(BKException.getMessage(bkErrorCode) + " error code: " + bkErrorCode); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java index 767a0c78b6d8a..3768c4dd61208 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java @@ -24,12 +24,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerMetadataBuilder; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.util.Backoff; +import org.apache.bookkeeper.common.util.Retries; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; @@ -181,4 +187,26 @@ public static LedgerMetadata parseLedgerMetadata(long id, byte[] bytes) throws I return builder.build(); } + + public static CompletableFuture cleanupOffloaded(long ledgerId, UUID uuid, ManagedLedgerConfig mlConfig, + Map offloadDriverMetadata, String cleanupReason, + String name, org.apache.bookkeeper.common.util.OrderedScheduler executor) { + log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.", + name, ledgerId, uuid.toString(), cleanupReason); + Map metadataMap = new HashMap(); + metadataMap.putAll(offloadDriverMetadata); + metadataMap.put("ManagedLedgerName", name); + + return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), + TimeUnit.SECONDS.toHours(1)).limit(10), + Retries.NonFatalPredicate, + () -> mlConfig.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap), + executor, name).whenComplete((ignored, exception) -> { + if (exception != null) { + log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", + name, ledgerId, cleanupReason, exception); + } + }); + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a07a84f70bdc2..4484327ad8dc7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2992,7 +2992,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, Collections.emptyMap()); retryStrategically((test) -> responseException1.get() != null, 5, 1000); assertNotNull(responseException1.get()); - assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException)); + assertTrue(responseException1.get().getMessage() + .startsWith(BKException.getMessage(BKException.Code.TimeoutException))); // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..) AtomicReference responseException2 = new AtomicReference<>(); @@ -3017,13 +3018,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { return responseException2.get() != null; }, 5, 1000); assertNotNull(responseException2.get()); - assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException)); + assertTrue(responseException2.get().getMessage() + .startsWith(BKException.getMessage(BKException.Code.TimeoutException))); ledger.close(); } /** - * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enought + * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enough * to create new ledger and add entry successfully. * * diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index ae0e53456e2d7..f35e40ce0529e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -34,10 +34,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; + +import com.google.common.collect.Sets; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -679,6 +683,63 @@ public void testOffloadDelete() throws Exception { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); } + @Test + public void testOffloadDeleteClosedLedger() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(0, TimeUnit.MINUTES); + offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(100L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + ManagedCursor cursor = ledger.openCursor("foobar"); + + for (int i = 0; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + + assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()).count(), 1); + assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); + + Set offloadedledgers = Sets.newHashSet(offloader.offloadedLedgers()); + assertTrue(offloadedledgers.size() > 0); + + Set bkLedgersInMLedger = Sets.newHashSet(ledger.getLedgersInfo().keySet()); + assertTrue(bkLedgersInMLedger.size() > 0); + + factory.close(ledger); + ledger.close(); + + AtomicInteger success = new AtomicInteger(0); + factory.asyncDelete("my_test_ledger", CompletableFuture.completedFuture(config), + new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + success.set(1); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + success.set(-1); + } + }, null); + assertEventuallyTrue(() -> success.get() == 1); + Set deletedledgers = offloader.deletedOffloads(); + assertEquals(offloadedledgers, deletedledgers); + + for (long ledgerId: bkLedgersInMLedger) { + assertFalse(bkc.getLedgers().contains(ledgerId)); + } + } + @Test public void testOffloadDeleteIncomplete() throws Exception { Set> deleted = ConcurrentHashMap.newKeySet(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8491615448aae..273af9460d98e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1039,7 +1039,9 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } public CompletableFuture deleteTopic(String topic, boolean forceDelete) { + TopicName topicName = TopicName.get(topic); Optional optTopic = getTopicReference(topic); + if (optTopic.isPresent()) { Topic t = optTopic.get(); if (forceDelete) { @@ -1066,9 +1068,8 @@ public CompletableFuture deleteTopic(String topic, boolean forceDelete) { return t.delete(); } - if (log.isDebugEnabled()) { - log.debug("Topic {} is not loaded, try to delete from metadata", topic); - } + log.info("Topic {} is not loaded, try to delete from metadata", topic); + // Topic is not loaded, though we still might be able to delete from metadata TopicName tn = TopicName.get(topic); if (!tn.isPersistent()) { @@ -1077,28 +1078,29 @@ public CompletableFuture deleteTopic(String topic, boolean forceDelete) { } CompletableFuture future = new CompletableFuture<>(); - CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); + deleteTopicAuthenticationFuture.whenComplete((v, ex) -> { if (ex != null) { future.completeExceptionally(ex); return; } - managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - future.complete(null); - } + CompletableFuture mlConfigFuture = getManagedLedgerConfig(topicName); + managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), + mlConfigFuture, new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + future.complete(null); + } - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); }); - return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fdcaaf2ffbdb3..68d348e50eca9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1031,10 +1031,12 @@ public CompletableFuture createSubscription(String subscriptionNam public CompletableFuture unsubscribe(String subscriptionName) { CompletableFuture unsubscribeFuture = new CompletableFuture<>(); + TopicName tn = TopicName.get(MLPendingAckStore + .getTransactionPendingAckStoreSuffix(topic, + Codec.encode(subscriptionName))); if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { - getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore - .getTransactionPendingAckStoreSuffix(topic, - Codec.encode(subscriptionName))).getPersistenceNamingEncoding(), + getBrokerService().getManagedLedgerFactory().asyncDelete(tn.getPersistenceNamingEncoding(), + getBrokerService().getManagedLedgerConfig(tn), new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { @@ -1191,53 +1193,69 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, .thenCompose(__ -> deleteTopicPolicies()) .thenCompose(__ -> transactionBufferCleanupAndClose()) .whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(ex); - } else { - List> subsDeleteFutures = new ArrayList<>(); - subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); - - FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { - if (e != null) { - log.error("[{}] Error deleting topic", topic, e); + if (ex != null) { + log.error("[{}] Error deleting topic", topic, ex); unfenceTopicToResume(); - deleteFuture.completeExceptionally(e); + deleteFuture.completeExceptionally(ex); } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } - - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - if (exception.getCause() - instanceof MetadataStoreException.NotFoundException) { - log.info("[{}] Topic is already deleted {}", - topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", topic, exception); - deleteFuture.completeExceptionally(new PersistenceException(exception)); - } + List> subsDeleteFutures = new ArrayList<>(); + subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { + if (e != null) { + log.error("[{}] Error deleting topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + // Truncate to ensure the offloaded data is not orphaned. + // Also ensures the BK ledgers are deleted and not just + // scheduled for deletion + CompletableFuture truncateFuture = ledger.asyncTruncate(); + truncateFuture.whenComplete((ignore, exc) -> { + if (e != null) { + log.error("[{}] Error truncating topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(PersistentTopic.this); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + } + + @Override + public void + deleteLedgerFailed(ManagedLedgerException exception, + Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", + topic, exception); + deleteFuture.completeExceptionally( + new PersistenceException(exception)); + } + } + }, null); + } + }); } - }, null); + }); } }); - } - }); } else { unfenceTopicToResume(); deleteFuture.completeExceptionally(new TopicBusyException( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 970bfd763a4e5..dd6d00f4cda27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1256,6 +1256,8 @@ public void testCloseTopic() throws Exception { @Test public void testDeleteTopic() throws Exception { + doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate(); + // create topic PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java index 1d7fb21062136..9e6b5261df16b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -32,14 +34,17 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite; +import org.awaitility.Awaitility; import org.testng.Assert; @Slf4j public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { - private static final int ENTRY_SIZE = 1024; + protected int getEntrySize() { + return 1024; + }; - private static byte[] buildEntry(String pattern) { - byte[] entry = new byte[ENTRY_SIZE]; + private byte[] buildEntry(String pattern) { + byte[] entry = new byte[getEntrySize()]; byte[] patternBytes = pattern.getBytes(); for (int i = 0; i < entry.length; i++) { @@ -64,15 +69,24 @@ protected void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String admi long firstLedger = -1; try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Producer producer = client.newProducer().topic(topic) + .maxPendingMessages(getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS) .blockIfQueueFull(true).enableBatching(false).create();) { client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close(); // write enough to topic to make it roll int i = 0; - for (; i < ENTRIES_PER_LEDGER * 1.5; i++) { - producer.sendAsync(buildEntry("offload-message" + i)); + AtomicBoolean success = new AtomicBoolean(true); + + for (; i < getNumEntriesPerLedger() * 1.5; i++) { + producer.sendAsync(buildEntry("offload-message" + i)) + .exceptionally(e -> { + log.error("failed to send a message", e); + success.set(false); + return null; + });; } producer.flush(); + Assert.assertTrue(success.get()); } try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { @@ -113,7 +127,7 @@ protected void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String admi try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Consumer consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) { // read back from topic - for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) { + for (int i = 0; i < getNumEntriesPerLedger() * 1.5; i++) { Message m = consumer.receive(1, TimeUnit.MINUTES); Assert.assertEquals(buildEntry("offload-message" + i), m.getData()); } @@ -138,25 +152,32 @@ protected void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, Strin long firstLedger = 0; try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Producer producer = client.newProducer().topic(topic) - .blockIfQueueFull(true).enableBatching(false).create(); - ) { + .maxPendingMessages(getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS) + .blockIfQueueFull(true).enableBatching(false).create()) { client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close(); + AtomicBoolean success = new AtomicBoolean(true); // write enough to topic to make it roll twice - for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) { - producer.sendAsync(buildEntry("offload-message" + i)); + for (int i = 0; i < getNumEntriesPerLedger() * 2.5; i++) { + producer.sendAsync(buildEntry("offload-message" + i)) + .exceptionally(e -> { + log.error("failed to send a message", e); + success.set(false); + return null; + });; } producer.flush(); + Assert.assertTrue(success.get()); } try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId; // wait up to 30 seconds for offload to occur - for (int i = 0; i < 300 && !admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) { - Thread.sleep(100); + for (int i = 0; i < 100 && !admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) { + Thread.sleep(300); } Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded); @@ -175,8 +196,9 @@ protected void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, Strin try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Consumer consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) { // read back from topic - for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) { + for (int i = 0; i < getNumEntriesPerLedger() * 2.5; i++) { Message m = consumer.receive(1, TimeUnit.MINUTES); + Assert.assertNotNull(m); Assert.assertEquals(buildEntry("offload-message" + i), m.getData()); } } @@ -197,30 +219,52 @@ private boolean ledgerOffloaded(List le .map(l -> l.offloaded).findFirst().get(); } - private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception { + private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) + throws Exception { + return writeAndWaitForOffload(serviceUrl, adminUrl, topic, -1); + } + + private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic, int partitionNum) + throws Exception { try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Producer producer = client.newProducer().topic(topic) + .maxPendingMessages(getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS) .blockIfQueueFull(true).enableBatching(false).create(); PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { - List ledgers = admin.topics().getInternalStats(topic).ledgers; + String topicToCheck = partitionNum >= 0 + ? topic + "-partition-" + partitionNum + : topic; + + List ledgers = admin.topics() + .getInternalStats(topicToCheck).ledgers; long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId; client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close(); + AtomicBoolean success = new AtomicBoolean(true); // write enough to topic to make it roll twice - for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) { - producer.sendAsync(buildEntry("offload-message" + i)); + for (int i = 0; + i < getNumEntriesPerLedger() * 2.5 * (partitionNum > 0 ? partitionNum + 1 : 1); + i++) { + producer.sendAsync(buildEntry("offload-message" + i)) + .exceptionally(e -> { + log.error("failed to send a message", e); + success.set(false); + return null; + }); } + producer.flush(); producer.send(buildEntry("final-offload-message")); + Assert.assertTrue(success.get()); // wait up to 30 seconds for offload to occur for (int i = 0; - i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger); + i < 100 && !ledgerOffloaded(admin.topics().getInternalStats(topicToCheck).ledgers, currentLedger); i++) { - Thread.sleep(100); + Thread.sleep(300); } - Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger)); + Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topicToCheck).ledgers, currentLedger)); return currentLedger; } @@ -295,4 +339,130 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String Thread.sleep(5000); Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); } + + protected void testDeleteOffloadedTopic(String serviceUrl, String adminUrl, + boolean unloadBeforeDelete, int numPartitions) throws Exception { + final String tenant = "offload-test-cli-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + pulsarCluster.runAdminCommandOnAnyBroker("tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", tenant); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), namespace); + + // set threshold to offload runs immediately after role + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-offload-threshold", "--size", "0", namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + String output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "create-partitioned-topic", topic, + "--partitions", Integer.toString(numPartitions)); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", topic); + } + + long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, + "--lag", "0m"); + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("0 minute(s)")); + + offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + // wait up to 10 seconds for ledger to be deleted + for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) { + writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + Thread.sleep(1000); + } + + Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + Assert.assertTrue(offloadedLedgerExists(topic, numPartitions - 1, offloadedLedger)); + + if (unloadBeforeDelete) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", topic); + } + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete-partitioned-topic", topic); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", topic); + } + final long ledgerId = offloadedLedger; + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertFalse(offloadedLedgerExists(topic, numPartitions - 1, ledgerId)); + }); + } + + protected void testDeleteOffloadedTopicExistsInBk(String serviceUrl, String adminUrl, + boolean unloadBeforeDelete, int numPartitions) throws Exception { + final String tenant = "offload-test-cli-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + pulsarCluster.runAdminCommandOnAnyBroker("tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", tenant); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), namespace); + + // set threshold to offload runs immediately after role + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-offload-threshold", "--size", "0", namespace); + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "create-partitioned-topic", topic, + "--partitions", Integer.toString(numPartitions)); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", topic); + } + + String output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + Assert.assertTrue(offloadedLedgerExists(topic, numPartitions - 1, offloadedLedger)); + + if (unloadBeforeDelete) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", topic); + } + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete-partitioned-topic", topic); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", topic); + } + final long ledgerId = offloadedLedger; + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertFalse(offloadedLedgerExists(topic, numPartitions - 1, ledgerId)); + }); + Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + } + + protected boolean offloadedLedgerExists(String topic, int partitionNum, long firstLedger) { + throw new RuntimeException("not implemented"); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java index 808aae62e7419..48b86e8a1f45d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java @@ -41,18 +41,17 @@ public void testPublishOffloadAndConsumeViaThreshold(Supplier serviceUrl @Test(dataProvider = "ServiceAndAdminUrls") public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) throws Exception { super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get()); - } - @Override protected Map getEnv() { Map result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); result.put("managedLedgerOffloadDriver", "filesystem"); result.put("fileSystemURI", "file:///"); return result; } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java new file mode 100644 index 0000000000000..4b1739a0cd13b --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.offload; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +@Slf4j +public class TestOffloadDeletionFS extends TestBaseOffload { + + @Override + protected int getEntrySize() { + return 512; + } + + @Override + protected int getNumEntriesPerLedger() { + return 200; + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedTopic(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), false, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedTopic(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), true, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedTopicExistsInBk(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), false, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedTopicExistsInBk(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), true, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedPartitionedTopic(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), false, 3); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedPartitionedTopic(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), true, 3); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedPartitionedTopicExistsInBk(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), false, 3); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedPartitionedTopicExistsInBk(Supplier serviceUrl, + Supplier adminUrl) throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), true, 3); + } + + @Override + protected Map getEnv() { + Map result = new HashMap<>(); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); + result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + result.put("managedLedgerOffloadDriver", "filesystem"); + result.put("fileSystemURI", "file:///"); + + return result; + } + + @Override + protected boolean offloadedLedgerExists(String topic, int partitionNum, long ledger) { + log.info("offloadedLedgerExists(topic = {}, partitionNum={},ledger={})", + topic, partitionNum, ledger); + if (partitionNum > -1) { + topic = topic + "-partition-" + partitionNum; + } + String managedLedgerName = TopicName.get(topic).getPersistenceNamingEncoding(); + String rootPath = "pulsar/"; + String dirPath = rootPath + managedLedgerName + "/"; + + List result = new LinkedList<>(); + String[] cmds = { + "ls", + "-1", + dirPath + }; + pulsarCluster.getBrokers().forEach(broker -> { + try { + ContainerExecResult res = broker.execCmd(cmds); + log.info("offloadedLedgerExists broker {} 'ls -1 {}' got {}", + broker.getContainerName(), dirPath, res.getStdout()); + Arrays.stream(res.getStdout().split("\n")) + .filter(x -> x.startsWith(ledger + "-")) + .forEach(x -> result.add(x)); + } catch (ContainerExecException ce) { + log.info("offloadedLedgerExists broker {} 'ls -1 {}' got error code {}", + broker.getContainerName(), dirPath, ce.getResult().getExitCode()); + // ignore 2 (No such file or directory) + if (ce.getResult().getExitCode() != 2) { + throw new RuntimeException(ce); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + return !result.isEmpty(); + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java index edbbcfeba5e10..a230b13e215f5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java @@ -73,7 +73,7 @@ public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, @Override protected Map getEnv() { Map result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); result.put("managedLedgerOffloadDriver", "aws-s3"); result.put("s3ManagedLedgerOffloadBucket", "pulsar-integtest"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java index 9c53d801ea1eb..ef7406113f6ee 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java @@ -72,7 +72,7 @@ public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, @Override protected Map getEnv() { Map result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); result.put("managedLedgerOffloadDriver", "aws-s3"); result.put("managedLedgerOffloadBucket", "pulsar-integtest"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java index 7811b38e0fd92..1c6bb9dc3f34c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java @@ -31,7 +31,9 @@ @Slf4j public abstract class PulsarTieredStorageTestSuite extends PulsarClusterTestBase { - protected static final int ENTRIES_PER_LEDGER = 1024; + protected int getNumEntriesPerLedger() { + return 1024; + } @BeforeClass(alwaysRun = true) @Override