diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 084f9f56d8..f1daf1c430 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -49,6 +49,7 @@ import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,6 +166,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean @VisibleForTesting public static volatile ColumnFamilyStore discardFlushResults; + public static volatile boolean validateCompactionAncestors; public final Keyspace keyspace; public final String name; @@ -699,8 +701,15 @@ public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map< Set ancestors; try { - CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION); - ancestors = compactionMetadata.ancestors; + Map compactionMetadata = desc.getMetadataSerializer().deserialize(desc, EnumSet.of(MetadataType.COMPACTION, MetadataType.VALID_ANCESTORS)); + if (compactionMetadata.get(MetadataType.VALID_ANCESTORS) != null || !validateCompactionAncestors) + { + ancestors = ((CompactionMetadata) compactionMetadata.get(MetadataType.COMPACTION)).ancestors; + } + else + { + ancestors = Collections.emptySet(); + } } catch (IOException e) { @@ -735,8 +744,15 @@ public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map< if (completedAncestors.contains(desc.generation)) { // if any of the ancestors were participating in a compaction, finish that compaction - logger.info("Going to delete leftover compaction ancestor {}", desc); - SSTable.delete(desc, sstableFiles.getValue()); + if (Boolean.getBoolean("palantir_cassandra.dry_run_ancestor_deletion")) + { + logger.info("Would have deleted leftover compaction ancestor {} if palantir_cassandra.dry_run_ancestor_deletion was false", desc); + } + else + { + logger.info("Going to delete leftover compaction ancestor {}", desc); + SSTable.delete(desc, sstableFiles.getValue()); + } UUID compactionTaskID = unfinishedCompactions.get(desc.generation); if (compactionTaskID != null) SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation)); @@ -833,6 +849,7 @@ public synchronized int loadNewSSTablesWithCount(boolean assumeCfIsEmpty) for (Map.Entry> entry : lister.list().entrySet()) { Descriptor descriptor = entry.getKey(); + Set components = entry.getValue(); if (currentDescriptors.contains(descriptor)) continue; // old (initialized) SSTable found, skipping @@ -856,28 +873,39 @@ public synchronized int loadNewSSTablesWithCount(boolean assumeCfIsEmpty) continue; } - // Increment the generation until we find a filename that doesn't exist. This is needed because the new - // SSTables that are being loaded might already use these generation numbers. Descriptor newDescriptor; - do + if (assumeCfIsEmpty) { - newDescriptor = new Descriptor(descriptor.version, - descriptor.directory, - descriptor.ksname, - descriptor.cfname, - fileIndexGenerator.incrementAndGet(), - Descriptor.Type.FINAL, - descriptor.formatType); + newDescriptor = descriptor; } - while (new File(newDescriptor.filenameFor(Component.DATA)).exists()); + else + { + // Increment the generation until we find a filename that doesn't exist. This is needed because the new + // SSTables that are being loaded might already use these generation numbers. + do + { + newDescriptor = new Descriptor(descriptor.version, + descriptor.directory, + descriptor.ksname, + descriptor.cfname, + fileIndexGenerator.incrementAndGet(), + Descriptor.Type.FINAL, + descriptor.formatType); + } + while (new File(newDescriptor.filenameFor(Component.DATA)).exists()); - logger.info("Renaming new SSTable {} to {}", descriptor, newDescriptor); - SSTableWriter.rename(descriptor, newDescriptor, entry.getValue()); + logger.info("Removing Statistics.db for new SSTable {} to clear old ancestor metadata", descriptor); + FileUtils.delete(new File(descriptor.filenameFor(Component.STATS))); + components = Sets.difference(components, ImmutableSet.of(Component.STATS)); + + logger.info("Renaming new SSTable {} to {}", descriptor, newDescriptor); + SSTableWriter.rename(descriptor, newDescriptor, components); + } SSTableReader reader; try { - reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata, partitioner); + reader = SSTableReader.open(newDescriptor, components, metadata, partitioner); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index de2e89d94b..a1461e06d7 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1118,6 +1118,16 @@ public static void snapshotOnVersionChange() throws IOException } } + public static boolean isPreviousVersionGreaterThanOrEqual(CassandraVersion version) + { + String previousVersion = getPreviousVersionString(); + if (previousVersion.equals(NULL_VERSION.toString()) || previousVersion.equals(UNREADABLE_VERSION.toString())) + { + return true; + } + return version.compareTo(new CassandraVersion(previousVersion)) > 0; + } + /** * Try to determine what the previous version, if any, was installed on this node. * Primary source of truth is the release version in system.local. If the previous @@ -1129,7 +1139,7 @@ public static void snapshotOnVersionChange() throws IOException * indicating either no previous version (SystemUpgrade.NULL_VERSION) or an unreadable, * legacy version (SystemUpgrade.UNREADABLE_VERSION). */ - private static String getPreviousVersionString() + public static String getPreviousVersionString() { String req = "SELECT release_version FROM system.%s WHERE key='%s'"; UntypedResultSet result = executeInternal(String.format(req, SystemKeyspace.LOCAL, SystemKeyspace.LOCAL)); diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 579ff7a02d..88afa3b5ba 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -295,6 +295,7 @@ public Map finalizeMetadata(String partitioner, hasLegacyCounterShards, repairedAt)); components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality)); + components.put(MetadataType.VALID_ANCESTORS, ValidAncestorsMetadata.instance); return components; } } diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 6120bafc4b..031df92e50 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -122,8 +122,8 @@ public Map deserialize(Descriptor descriptor, F { in.seek(offset); component = type.serializer.deserialize(descriptor.version, in); + components.put(type, component); } - components.put(type, component); } return components; } diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java index 9717da1591..04c97f3622 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.io.sstable.metadata; +import org.apache.cassandra.utils.CassandraVersion; + /** * Defines Metadata component type. */ @@ -27,7 +29,12 @@ public enum MetadataType /** Metadata only used at compaction */ COMPACTION(CompactionMetadata.serializer), /** Metadata always keep in memory */ - STATS(StatsMetadata.serializer); + STATS(StatsMetadata.serializer), + /** Meta-metadata about whether the ancestors metadata is valid **/ + VALID_ANCESTORS(ValidAncestorsMetadata.serializer); + + // Update before release: + public static final CassandraVersion VALID_ANCESTORS_VERSION = new CassandraVersion("2.2.18-1.161.0"); public final IMetadataComponentSerializer serializer; diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidAncestorsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidAncestorsMetadata.java new file mode 100644 index 0000000000..69092d3f64 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidAncestorsMetadata.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.metadata; + +import java.io.DataInput; + +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Marker metadata component indicating this SSTable has valid compaction ancestor information. + */ +public class ValidAncestorsMetadata extends MetadataComponent +{ + public static final IMetadataComponentSerializer serializer = new ValidAncestorsMetadataSerializer(); + public static final ValidAncestorsMetadata instance = new ValidAncestorsMetadata(); + + private ValidAncestorsMetadata() {} + + public MetadataType getType() + { + return MetadataType.VALID_ANCESTORS; + } + + public static class ValidAncestorsMetadataSerializer implements IMetadataComponentSerializer + { + @Override + public int serializedSize(ValidAncestorsMetadata component, Version version) + { + return 0; + } + + @Override + public void serialize(ValidAncestorsMetadata component, Version version, DataOutputPlus out) {} + + @Override + public ValidAncestorsMetadata deserialize(Version version, DataInput in) + { + return instance; + } + } +} diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index b0ff5c0a11..e0d1de50ce 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -54,6 +55,7 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -74,6 +76,7 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.StartupException; +import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; @@ -215,6 +218,13 @@ protected void setup() exitOrFail(3, e.getMessage(), e.getCause()); } + ColumnFamilyStore.validateCompactionAncestors = Optional.of(SystemKeyspace.getPreviousVersionString()) + .filter(Predicates.not(SystemKeyspace.NULL_VERSION.toString()::equals)) + .filter(Predicates.not(SystemKeyspace.UNREADABLE_VERSION.toString()::equals)) + .map(CassandraVersion::new) + .map(previousVersion -> previousVersion.compareTo(MetadataType.VALID_ANCESTORS_VERSION) >= 0) + .orElse(true); + // We need to persist this as soon as possible after startup checks. // This should be the first write to SystemKeyspace (CASSANDRA-11742) SystemKeyspace.persistLocalMetadata(); diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java index 831901439c..f75c4c76ff 100644 --- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java +++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java @@ -54,6 +54,7 @@ public static void main(String[] args) throws IOException ValidationMetadata validation = (ValidationMetadata) metadata.get(MetadataType.VALIDATION); StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS); CompactionMetadata compaction = (CompactionMetadata) metadata.get(MetadataType.COMPACTION); + ValidAncestorsMetadata validAncestors = (ValidAncestorsMetadata) metadata.get(MetadataType.VALID_ANCESTORS); out.printf("SSTable: %s%n", descriptor); if (validation != null) @@ -81,7 +82,15 @@ public static void main(String[] args) throws IOException } if (compaction != null) { - out.printf("Ancestors: %s%n", compaction.ancestors.toString()); + out.printf("Ancestors: %s", compaction.ancestors.toString()); + if (validAncestors != null) + { + out.println(" (considered valid)"); + } + else + { + out.println(" (considered invalid)"); + } out.printf("Estimated cardinality: %s%n", compaction.cardinalityEstimator.cardinality()); } diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java index 6d07f1c3ed..d3d3ba532a 100644 --- a/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableReaderTest.java @@ -196,26 +196,6 @@ public void testSpannedIndexPositions() throws IOException } } - @Test - public void testPersistentStatistics() - { - - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); - - for (int j = 0; j < 100; j += 2) - { - ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); - Mutation rm = new Mutation(KEYSPACE1, key); - rm.add("Standard1", cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); - rm.applyUnsafe(); - } - store.forceBlockingFlush(); - - clearAndLoad(store); - assert store.metric.maxRowSize.getValue() != 0; - } - private void clearAndLoad(ColumnFamilyStore cfs) { cfs.clearUnsafe();