From 0f4213d75d79d465418ae8ab87a77fb944b97fa3 Mon Sep 17 00:00:00 2001 From: Will Dey Date: Wed, 23 Oct 2024 21:17:23 -0400 Subject: [PATCH] no behavior changes, only dry-run --- .../cassandra/db/ColumnFamilyStore.java | 5 +- .../db/compaction/CompactionTask.java | 189 +++++++++--------- .../cassandra/service/CassandraDaemon.java | 5 +- .../cassandra/db/ColumnFamilyStoreTest.java | 66 +----- 4 files changed, 95 insertions(+), 170 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index fe711888b4..60f2d25dd5 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -108,7 +108,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean( - "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); + "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, @@ -763,8 +763,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map Descriptor desc = sstableFiles.getKey(); if (completedAncestors.contains(desc.generation)) { - if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata() - || (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())) + if (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty()) { logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc), SafeArg.of("keyspace", desc.ksname), SafeArg.of("cf", desc.cfname), diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index f2fb92d0c7..a03b56bca4 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -30,7 +30,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.palantir.cassandra.db.ColumnFamilyStoreManager; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; @@ -100,7 +99,6 @@ public boolean reduceScopeForLimitedSpace(long expectedSize) * which are properly serialized. * Caller is in charge of marking/unmarking the sstables as compacting. */ - @SuppressWarnings("resource") // It is dangerous to close refs for a failed transaction protected void runMayThrow() throws Exception { // The collection of sstables passed may be empty (but not null); even if @@ -157,123 +155,116 @@ public boolean apply(SSTableReader sstable) long totalKeysWritten = 0; long estimatedKeys = 0; - CompactionController controller = getCompactionController(transaction.originals()); - Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - - SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); - - - // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references - // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. - // See CASSANDRA-8019 and CASSANDRA-8399 - boolean abortFailed = false; - UUID taskId = null; - String taskIdLoggerMsg; - List newSStables; - AbstractCompactionIterable ci = null; - Refs refs = Refs.ref(actuallyCompact); - boolean readyToFinish = false; - try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + try (CompactionController controller = getCompactionController(transaction.originals())) { - taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); - taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); - logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); - ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); - try (CloseableIterator iter = ci.iterator()) - { - long lastCheckObsoletion = start; + Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - if (!controller.cfs.getCompactionStrategy().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); - if (collector != null) - collector.beginCompaction(ci); - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 + boolean abortFailed = false; + UUID taskId = null; + String taskIdLoggerMsg; + List newSStables; + AbstractCompactionIterable ci = null; + try (Refs refs = Refs.ref(actuallyCompact); + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + { + taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); + taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); + logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); + ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); + try (CloseableIterator iter = ci.iterator()) { - estimatedKeys = writer.estimatedKeys(); - while (iter.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + long lastCheckObsoletion = start; + + if (!controller.cfs.getCompactionStrategy().isActive) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - try (AbstractCompactedRow row = iter.next()) + if (collector != null) + collector.beginCompaction(ci); + + boolean readyToFinish = false; + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + { + estimatedKeys = writer.estimatedKeys(); + while (iter.hasNext()) { - if (writer.append(row)) - totalKeysWritten++; + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) + try (AbstractCompactedRow row = iter.next()) { - controller.maybeRefreshOverlaps(); - lastCheckObsoletion = System.nanoTime(); + if (writer.append(row)) + totalKeysWritten++; + + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) + { + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); + } } } - } - readyToFinish = true; - newSStables = writer.finish(); - } catch (Exception e) { - CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); - if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) - { - abortFailed = true; - logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " + - "system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " + - "compaction-product sstables are marked final while others remain tmp", - cfs.keyspace.getName(), cfs.name, e); + readyToFinish = true; + newSStables = writer.finish(); + } catch (Exception e) { + CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); + if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) + { + abortFailed = true; + logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " + + "system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " + + "compaction-product sstables are marked final while others remain tmp", + cfs.keyspace.getName(), cfs.name, e); + } + throw exception; } - throw exception; } } - } - finally - { - if (!readyToFinish || !ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()) { - // TODO(wdey): refactor all of the trys - try (Refs closedRefs = refs; CompactionController closedController = controller) {} - } - Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); - if (taskId != null && (!abortFailed) && !ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()) - SystemKeyspace.finishCompaction(taskId); + finally + { + Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); + if (taskId != null && (!abortFailed)) + SystemKeyspace.finishCompaction(taskId); - if (collector != null && ci != null) - collector.finishCompaction(ci); - } + if (collector != null && ci != null) + collector.finishCompaction(ci); + } - ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); - if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()) { - SystemKeyspace.finishCompaction(taskId); - } - refs.close(); - controller.close(); + ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(transaction.originals()); - long endsize = SSTableReader.getTotalBytes(newSStables); - double ratio = (double) endsize / (double) startsize; + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(transaction.originals()); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; - StringBuilder newSSTableNames = new StringBuilder(); - for (SSTableReader reader : newSStables) - newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); - if (offline) - { - Refs.release(Refs.selfRefs(newSStables)); - } - else - { - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); - logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); - - // update the metrics - cfs.metric.compactionBytesWritten.inc(endsize); - cfs.metric.compactionsCompleted.inc(); + if (offline) + { + Refs.release(Refs.selfRefs(newSStables)); + } + else + { + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); + logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); + + // update the metrics + cfs.metric.compactionBytesWritten.inc(endsize); + cfs.metric.compactionsCompleted.inc(); + } } } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index d5f4b9c3cd..eaace17482 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -61,7 +61,6 @@ import com.palantir.cassandra.concurrent.LocalReadRunnableTimeoutWatcher; import com.palantir.cassandra.db.BootstrappingSafetyException; -import com.palantir.cassandra.db.ColumnFamilyStoreManager; import com.palantir.cassandra.settings.DisableClientInterfaceSetting; import org.apache.cassandra.config.Config; import org.slf4j.Logger; @@ -270,9 +269,7 @@ private void completeSetupMayThrowSstableException() { for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) { - if (ColumnFamilyStoreManager.instance.shouldRemoveUnusedSstablesBasedOnAncestorMetadata()) { - ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); - } + ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); ColumnFamilyStore.scrubDataDirectories(cfm); } } diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 1ab735b637..3b8279c7e4 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -118,7 +118,6 @@ public class ColumnFamilyStoreTest public static final String CF_STANDARD5 = "Standard5"; public static final String CF_STANDARD6 = "Standard6"; public static final String CF_STANDARD7 = "Standard7"; - public static final String CF_STANDARD8 = "Standard8"; public static final String CF_STANDARDINT = "StandardInteger1"; public static final String CF_SUPER1 = "Super1"; public static final String CF_SUPER6 = "Super6"; @@ -149,7 +148,6 @@ public static void defineSchema() throws ConfigurationException SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD6), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD7), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD8), SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEX1, true), SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEX2, false), SchemaLoader.superCFMD(KEYSPACE1, CF_SUPER1, LongType.instance), @@ -1986,14 +1984,13 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); }; - try - { + try { ColumnFamilyStoreManager.instance.registerValidator(validator); ColumnFamilyStore.removeUnusedSstables(cfmeta, ImmutableMap.of()); } finally { - ColumnFamilyStoreManager.instance.unregisterValidator(); + ColumnFamilyStoreManager.instance.unregisterValidator(validator); } sstables = dir.sstableLister().list(); @@ -2005,65 +2002,6 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException assertEquals(expected, sstables.keySet()); } - @Test - public void testShouldSkipAncestorCleanupSkipsAncestorCleanup() throws IOException - { - final String ks = KEYSPACE1; - final String cf = CF_STANDARD8; - - final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); - Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf).disableAutoCompaction(); - Directories dir = new Directories(cfmeta); - - int gen1 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); - int gen2 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); - int gen3 = writeNextGenerationSstable(ImmutableSet.of(gen1, gen2), dir, cfmeta); - int gen4 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); - int gen5 = writeNextGenerationSstable(ImmutableSet.of(gen4), dir, cfmeta); - - Map> sstables = dir.sstableLister().list(); - Descriptor sstable3Desc = sstables.keySet().iterator().next().withGeneration(gen3); - assertEquals(5, sstables.size()); - assertTrue(sstables.containsKey(sstable3Desc)); - - IColumnFamilyStoreValidator validator = new IColumnFamilyStoreValidator() - { - @Override - public Map> filterValidAncestors(CFMetaData _cfMetaData, Map> sstableToCompletedAncestors, Map _unfinishedCompactions) - { - Set allowedGenerations = ImmutableSet.of(gen1, gen2, gen4, gen5); - return sstableToCompletedAncestors.entrySet().stream() - .filter(entry -> allowedGenerations.contains(entry.getKey().generation)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - @Override - public boolean shouldSkipAncestorCleanupBasedOnAncestorMetadata() - { - return true; - } - }; - - try - { - ColumnFamilyStoreManager.instance.registerValidator(validator); - ColumnFamilyStore.removeUnusedSstables(cfmeta, ImmutableMap.of()); - } - finally - { - ColumnFamilyStoreManager.instance.unregisterValidator(); - } - - sstables = dir.sstableLister().list(); - ImmutableSet expected = ImmutableSet.of( - sstable3Desc.withGeneration(gen1), - sstable3Desc.withGeneration(gen2), - sstable3Desc.withGeneration(gen3), - sstable3Desc.withGeneration(gen4), - sstable3Desc.withGeneration(gen5)); - assertEquals(expected, sstables.keySet()); - } - @Test public void testLoadNewSSTablesAvoidsOverwrites() throws Throwable {