Skip to content

Commit

Permalink
Write-ahead log dry run (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
wi11dey authored Oct 28, 2024
1 parent 297eae1 commit eb23f44
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 25 deletions.
54 changes: 37 additions & 17 deletions src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,63 @@

package com.palantir.cassandra.db;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;

import com.palantir.cassandra.db.compaction.IColumnFamilyStoreWriteAheadLogger;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.sstable.Descriptor;


public class ColumnFamilyStoreManager implements IColumnFamilyStoreValidator
public class ColumnFamilyStoreManager implements IColumnFamilyStoreValidator, IColumnFamilyStoreWriteAheadLogger
{
private static final IColumnFamilyStoreValidator NO_OP_VALIDATOR = (_cfMetaData, sstableToCompletedAncestors, _unfinishedCompactions) -> sstableToCompletedAncestors;
private static final IColumnFamilyStoreWriteAheadLogger NO_OP_WRITE_AHEAD_LOGGER = (cfMetaData, descriptors) -> {};

public static final ColumnFamilyStoreManager instance = new ColumnFamilyStoreManager();
private final List<IColumnFamilyStoreValidator> validators;
private volatile IColumnFamilyStoreValidator validator = NO_OP_VALIDATOR;
private volatile IColumnFamilyStoreWriteAheadLogger writeAheadLogger = NO_OP_WRITE_AHEAD_LOGGER;

private ColumnFamilyStoreManager()
{
this.validators = new CopyOnWriteArrayList<>();
}
private ColumnFamilyStoreManager() {}

public void registerValidator(IColumnFamilyStoreValidator validator)
{
validators.add(validator);
this.validator = validator;
}

public void unregisterValidator(IColumnFamilyStoreValidator validator)
public void unregisterValidator()
{
validators.remove(validator);
this.validator = NO_OP_VALIDATOR;
}

public void registerWriteAheadLogger(IColumnFamilyStoreWriteAheadLogger writeAheadLogger) {
this.writeAheadLogger = writeAheadLogger;
}

public void unregisterWriteAheadLogger() {
this.writeAheadLogger = NO_OP_WRITE_AHEAD_LOGGER;
}

@Override
public Map<Descriptor, Set<Integer>> filterValidAncestors(CFMetaData cfMetaData, Map<Descriptor, Set<Integer>> sstableToCompletedAncestors, Map<Integer, UUID> unfinishedCompactions)
{
Map<Descriptor, Set<Integer>> filtered = sstableToCompletedAncestors;
for (IColumnFamilyStoreValidator validator : validators)
{
filtered = validator.filterValidAncestors(cfMetaData, filtered, unfinishedCompactions);
}
return filtered;
return validator.filterValidAncestors(cfMetaData, sstableToCompletedAncestors, unfinishedCompactions);
}

@Override
public boolean shouldRemoveUnusedSstablesBasedOnAncestorMetadata() {
return validator.shouldRemoveUnusedSstablesBasedOnAncestorMetadata();
}

@Override
public boolean shouldSkipAncestorCleanupBasedOnAncestorMetadata() {
return validator.shouldSkipAncestorCleanupBasedOnAncestorMetadata();
}

@Override
public void markForDeletion(CFMetaData cfMetaData, Set<Descriptor> descriptors)
{
writeAheadLogger.markForDeletion(cfMetaData, descriptors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,22 @@ public interface IColumnFamilyStoreValidator
*/
Map<Descriptor, Set<Integer>> filterValidAncestors(CFMetaData cfMetaData,
Map<Descriptor, Set<Integer>> sstableToCompletedAncestors, Map<Integer, UUID> unfinishedCompactions);

/**
* @return true if Cassandra should use ancestry metdata to cleanup unused SSTables on startup by running
* {@link org.apache.cassandra.db.ColumnFamilyStore#removeUnusedSstables(CFMetaData, Map)}, false otherwise
* (e.g. if a different cleanup system is being used outside of {@link org.apache.cassandra.service.CassandraDaemon}).
*/
default boolean shouldRemoveUnusedSstablesBasedOnAncestorMetadata() {
return true;
}

/**
* @return true if Cassandra should skip cleaning up ancestors during
* {@link org.apache.cassandra.db.ColumnFamilyStore#removeUnusedSstables(CFMetaData, Map)}, false otherwise. Note
* that this flag does not control whether compaction products being cleaned up.
*/
default boolean shouldSkipAncestorCleanupBasedOnAncestorMetadata() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.db.compaction;

import java.util.Set;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.sstable.Descriptor;


public interface IColumnFamilyStoreWriteAheadLogger
{
/**
* Mark {@code descriptors} as a set of SSTables in {@code cfMetaData} needing to be atomically deleted.
*
* This method is assumed to be thread-safe.
* Implementors are resposible for synchronization, and treat {@code cfMetaData} as the smallest unit on which to
* synchronize on.
*
* @param cfMetaData Metadata of the column family in which these descriptors reside.
* @param descriptors Set of SSTable descriptors to be atomically deleted. Guaranteed to be in the same columnfamily.
*
* @throws RuntimeException Signals that writing to the write-ahead log failed, and that none of the descriptors
* should be deleted in this runtime.
*/
void markForDeletion(CFMetaData cfMetaData, Set<Descriptor> descriptors);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;

import com.palantir.cassandra.db.ColumnFamilyStoreManager;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
Expand Down Expand Up @@ -235,6 +236,8 @@ public boolean apply(SSTableReader sstable)
collector.finishCompaction(ci);
}

ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());

// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = SSTableReader.getTotalBytes(transaction.originals());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.cassandra.db.lifecycle;

import java.util.*;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
Expand All @@ -28,6 +29,7 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
import org.apache.cassandra.utils.concurrent.Transactional;
Expand All @@ -53,7 +55,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
* has two instances, one containing modifications that are "staged" (i.e. invisible)
* and one containing those "logged" that have been made visible through a call to checkpoint()
*/
private static class State
public static class State
{
// readers that are either brand new, update a previous new reader, or update one of the original readers
final Set<SSTableReader> update = new HashSet<>();
Expand Down Expand Up @@ -84,6 +86,12 @@ void clear()
obsolete.clear();
}

public Set<Descriptor> obsoleteDescriptors() {
return obsolete.stream()
.map(ssTableReader -> ssTableReader.descriptor)
.collect(Collectors.toSet());
}

@Override
public String toString()
{
Expand All @@ -104,7 +112,7 @@ public String toString()
private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>());

// changes that have been made visible
private final State logged = new State();
public final State logged = new State();
// changes that are pending
private final State staged = new State();

Expand Down
21 changes: 16 additions & 5 deletions src/java/org/apache/cassandra/io/sstable/SSTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;

import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
Expand All @@ -29,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.DecoratedKey;
Expand All @@ -38,7 +41,6 @@
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.utils.Pair;

Expand Down Expand Up @@ -105,20 +107,29 @@ public static boolean delete(Descriptor desc, Set<Component> components)
{
// remove the DATA component first if it exists
if (components.contains(Component.DATA))
FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
deleteWithLog(FileUtils::deleteWithConfirm, desc, Component.DATA);
for (Component component : components)
{
if (component.equals(Component.DATA) || component.equals(Component.SUMMARY))
continue;

FileUtils.deleteWithConfirm(desc.filenameFor(component));
deleteWithLog(FileUtils::deleteWithConfirm, desc, component);
}
FileUtils.delete(desc.filenameFor(Component.SUMMARY));
deleteWithLog(FileUtils::delete, desc, Component.SUMMARY);

logger.trace("Deleted {}", desc);
return true;
}

private static void deleteWithLog(Consumer<String> deleter, Descriptor desc, Component component) {
String filename = desc.filenameFor(component);
deleter.accept(filename);
logger.info("Deleted SSTable file {}",
UnsafeArg.of("filename", filename),
SafeArg.of("keyspace", desc.ksname),
SafeArg.of("columnFamily", desc.cfname),
SafeArg.of("relativeFilename", desc.relativeFilenameFor(component)));
}

/**
* If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only
* as large as necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1990,7 +1990,7 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException
}
finally
{
ColumnFamilyStoreManager.instance.unregisterValidator(validator);
ColumnFamilyStoreManager.instance.unregisterValidator();
}

sstables = dir.sstableLister().list();
Expand Down

0 comments on commit eb23f44

Please sign in to comment.