Skip to content

Commit

Permalink
no behavior changes, only dry-run
Browse files Browse the repository at this point in the history
  • Loading branch information
wi11dey committed Oct 24, 2024
1 parent a3907a3 commit 0f4213d
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 170 deletions.
5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);

private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean(
"palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup");
"palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup");

private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
Expand Down Expand Up @@ -763,8 +763,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map<Integer, UUID>
Descriptor desc = sstableFiles.getKey();
if (completedAncestors.contains(desc.generation))
{
if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()
|| (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty()))
if (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())
{
logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc),
SafeArg.of("keyspace", desc.ksname), SafeArg.of("cf", desc.cfname),
Expand Down
189 changes: 90 additions & 99 deletions src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;

import com.palantir.cassandra.db.ColumnFamilyStoreManager;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
Expand Down Expand Up @@ -100,7 +99,6 @@ public boolean reduceScopeForLimitedSpace(long expectedSize)
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
@SuppressWarnings("resource") // It is dangerous to close refs for a failed transaction
protected void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even if
Expand Down Expand Up @@ -157,123 +155,116 @@ public boolean apply(SSTableReader sstable)
long totalKeysWritten = 0;

long estimatedKeys = 0;
CompactionController controller = getCompactionController(transaction.originals());
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());

SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());


// SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
// to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
// See CASSANDRA-8019 and CASSANDRA-8399
boolean abortFailed = false;
UUID taskId = null;
String taskIdLoggerMsg;
List<SSTableReader> newSStables;
AbstractCompactionIterable ci = null;
Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
boolean readyToFinish = false;
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
try (CompactionController controller = getCompactionController(transaction.originals()))
{
taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals());
taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
{
long lastCheckObsoletion = start;
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());

if (!controller.cfs.getCompactionStrategy().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());
SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());

if (collector != null)
collector.beginCompaction(ci);

try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
// SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
// to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
// See CASSANDRA-8019 and CASSANDRA-8399
boolean abortFailed = false;
UUID taskId = null;
String taskIdLoggerMsg;
List<SSTableReader> newSStables;
AbstractCompactionIterable ci = null;
try (Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals());
taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
{
estimatedKeys = writer.estimatedKeys();
while (iter.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
long lastCheckObsoletion = start;

if (!controller.cfs.getCompactionStrategy().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());

try (AbstractCompactedRow row = iter.next())
if (collector != null)
collector.beginCompaction(ci);

boolean readyToFinish = false;
try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
{
estimatedKeys = writer.estimatedKeys();
while (iter.hasNext())
{
if (writer.append(row))
totalKeysWritten++;
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());

if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
try (AbstractCompactedRow row = iter.next())
{
controller.maybeRefreshOverlaps();
lastCheckObsoletion = System.nanoTime();
if (writer.append(row))
totalKeysWritten++;

if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
{
controller.maybeRefreshOverlaps();
lastCheckObsoletion = System.nanoTime();
}
}
}
}

readyToFinish = true;
newSStables = writer.finish();
} catch (Exception e) {
CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e);
if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0)
{
abortFailed = true;
logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " +
"system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " +
"compaction-product sstables are marked final while others remain tmp",
cfs.keyspace.getName(), cfs.name, e);
readyToFinish = true;
newSStables = writer.finish();
} catch (Exception e) {
CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e);
if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0)
{
abortFailed = true;
logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " +
"system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " +
"compaction-product sstables are marked final while others remain tmp",
cfs.keyspace.getName(), cfs.name, e);
}
throw exception;
}
throw exception;
}
}
}
finally
{
if (!readyToFinish || !ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()) {
// TODO(wdey): refactor all of the trys
try (Refs closedRefs = refs; CompactionController closedController = controller) {}
}
Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS);
if (taskId != null && (!abortFailed) && !ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata())
SystemKeyspace.finishCompaction(taskId);
finally
{
Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS);
if (taskId != null && (!abortFailed))
SystemKeyspace.finishCompaction(taskId);

if (collector != null && ci != null)
collector.finishCompaction(ci);
}
if (collector != null && ci != null)
collector.finishCompaction(ci);
}

ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());
if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()) {
SystemKeyspace.finishCompaction(taskId);
}
refs.close();
controller.close();
ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());

// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = SSTableReader.getTotalBytes(transaction.originals());
long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;
// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = SSTableReader.getTotalBytes(transaction.originals());
long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;

StringBuilder newSSTableNames = new StringBuilder();
for (SSTableReader reader : newSStables)
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
StringBuilder newSSTableNames = new StringBuilder();
for (SSTableReader reader : newSStables)
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");

if (offline)
{
Refs.release(Refs.selfRefs(newSStables));
}
else
{
double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
long totalSourceRows = 0;
String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten));

// update the metrics
cfs.metric.compactionBytesWritten.inc(endsize);
cfs.metric.compactionsCompleted.inc();
if (offline)
{
Refs.release(Refs.selfRefs(newSStables));
}
else
{
double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
long totalSourceRows = 0;
String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten));

// update the metrics
cfs.metric.compactionBytesWritten.inc(endsize);
cfs.metric.compactionsCompleted.inc();
}
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@

import com.palantir.cassandra.concurrent.LocalReadRunnableTimeoutWatcher;
import com.palantir.cassandra.db.BootstrappingSafetyException;
import com.palantir.cassandra.db.ColumnFamilyStoreManager;
import com.palantir.cassandra.settings.DisableClientInterfaceSetting;
import org.apache.cassandra.config.Config;
import org.slf4j.Logger;
Expand Down Expand Up @@ -270,9 +269,7 @@ private void completeSetupMayThrowSstableException() {

for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
{
if (ColumnFamilyStoreManager.instance.shouldRemoveUnusedSstablesBasedOnAncestorMetadata()) {
ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of()));
}
ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of()));
ColumnFamilyStore.scrubDataDirectories(cfm);
}
}
Expand Down
Loading

0 comments on commit 0f4213d

Please sign in to comment.