Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Guarding Writes by Logical Timestamps #569

Draft
wants to merge 6 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.logicalts;

import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;

public class CollectionBasedFrozenTimestampTracker implements FrozenTimestampTracker
{
public static final FrozenTimestampTracker INSTANCE = new CollectionBasedFrozenTimestampTracker();
private final NavigableSet<Long> activeReaders = new ConcurrentSkipListSet<>();
private static final long UNINITIALIZED_TIMESTAMP = -1;

private final AtomicLong frozenTimestamp = new AtomicLong(UNINITIALIZED_TIMESTAMP);

@Override
public void advanceFrozenTimestamp(long desiredTimestamp) throws InProgressMutationConflictException
{
if (timestampIsLocked(desiredTimestamp))
{
throw new InProgressMutationConflictException("There are in-progress mutations before the sweep timestamp. Sweep can not proceed.");
}
frozenTimestamp.updateAndGet(current -> Long.max(current, desiredTimestamp));
}

private boolean timestampIsLocked(long desiredTimestamp)
{
return activeReaders.floor(desiredTimestamp) != null;
}

@Override
public UncheckedAutoCloseable checkAndLockForMutation(long mutationTimestamp) throws IllegalLogicalTimestampException
{
long frozenTimestamp = this.frozenTimestamp.get();
if (frozenTimestamp == UNINITIALIZED_TIMESTAMP)
{
throw new IllegalLogicalTimestampException("Frozen timestamp has not been initialized");
}
if (mutationTimestamp < frozenTimestamp)
{
throw new IllegalLogicalTimestampException("Mutation start timestamp is before the frozen timestamp");
}

activeReaders.add(mutationTimestamp);

if (mutationTimestamp < this.frozenTimestamp.get())
{
activeReaders.remove(mutationTimestamp);
throw new IllegalLogicalTimestampException("Frozen timestamp was updated before we were able to lock our start timestamp");
}

// TODO: How can we ensure that this closable is ALWAYS executed? Or expire stale locks?
return () -> activeReaders.remove(mutationTimestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.logicalts;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.Iterables;

import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.io.sstable.ColumnStats;

public class FrozenTimestampMutationVerifier implements MutationVerifier
{
private final FrozenTimestampTrackerFactory frozenTimestampTrackerFactory;
private final Map<String, FrozenTimestampTracker> namespaceToFrozenTimestampTracker;

public FrozenTimestampMutationVerifier(FrozenTimestampTrackerFactory frozenTimestampTrackerFactory)
{
this.frozenTimestampTrackerFactory = frozenTimestampTrackerFactory;
this.namespaceToFrozenTimestampTracker = new ConcurrentHashMap<>();
}

@Override
public UncheckedAutoCloseable verifyMutations(Collection<? extends IMutation> mutations) throws IllegalLogicalTimestampException
{
if (mutations.size() == 1)
{
return verifyMutation(Iterables.getOnlyElement(mutations));
}
Map<String, Long> keyspaceToMaxWriteTimestamp = computeKeyspaceToMaxWriteTimestamp(mutations);
List<UncheckedAutoCloseable> locks = acquireAllLocks(keyspaceToMaxWriteTimestamp);

return () -> locks.forEach(UncheckedAutoCloseable::close);
}

@Override
public UncheckedAutoCloseable verifyMutation(IMutation mutation) throws IllegalLogicalTimestampException
{
return acquireLock(mutation.getKeyspaceName(), maxTimestamp(mutation));
}

private List<UncheckedAutoCloseable> acquireAllLocks(Map<String, Long> keyspaceToMaxWriteTimestamp) throws IllegalLogicalTimestampException
{
List<UncheckedAutoCloseable> locks = new ArrayList<>();

for (Map.Entry<String, Long> keyspaceAndMaxWriteTimestamp : keyspaceToMaxWriteTimestamp.entrySet())
{
String keyspaceName = keyspaceAndMaxWriteTimestamp.getKey();
Long maxWriteTimestamp = keyspaceAndMaxWriteTimestamp.getValue();
locks.add(acquireLock(keyspaceName, maxWriteTimestamp));
}
return locks;
}

private UncheckedAutoCloseable acquireLock(String keyspaceName, Long maxWriteTimestamp) throws IllegalLogicalTimestampException
{
return namespaceToFrozenTimestampTracker.computeIfAbsent(
keyspaceName,
ignored -> frozenTimestampTrackerFactory.create()).checkAndLockForMutation(maxWriteTimestamp);
}

private static Map<String, Long> computeKeyspaceToMaxWriteTimestamp(Collection<? extends IMutation> mutations)
{
Map<String, Long> keyspaceToMaxWriteTimestamp = new HashMap<>();

for (IMutation mutation : mutations)
{
String keyspaceName = mutation.getKeyspaceName();
long maxTimestamp = maxTimestamp(mutation);
keyspaceToMaxWriteTimestamp.compute(
keyspaceName,
(ignored, current) -> current != null ? Long.max(current, maxTimestamp) : maxTimestamp);
}
return keyspaceToMaxWriteTimestamp;
}

private static long maxTimestamp(IMutation mutation)
{
ColumnStats.MaxLongTracker tracker = new ColumnStats.MaxLongTracker(Long.MIN_VALUE);
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
for (Cell cell : columnFamily)
{
tracker.update(cell.timestamp());
}
}
return tracker.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.logicalts;

public interface FrozenTimestampTracker
{
/**
* Advance the frozenTimestamp to the desiredTimestamp. This method throws
* if there are in-progress mutations holding a lock at or before the desiredTimestamp.
* If the implementation persists the frozenTimestamp, the timestamp
* must be persisted durably before this method returns.
*
* @param desiredTimestamp An observed Sweep timestamp
* @throws InProgressMutationConflictException if there are in-progress mutations holding a lock at or before the given timestamp
*/
void advanceFrozenTimestamp(long desiredTimestamp) throws InProgressMutationConflictException;

/**
* Lock a Mutation timestamp. This lock must be held while applying a Mutation.
*
* @param mutationTimestamp The write time of the Mutation. We will hold a lock on this timestamp,
* and the frozen timestamp will not be allowed to increase past this
* timestamp while we hold this lock.
* @return An AutoClosable that releases the lock on the mutationTimestamp when closed
* @throws IllegalLogicalTimestampException if the provided startTimestamp < the current Frozen Timestamp
*/
UncheckedAutoCloseable checkAndLockForMutation(long mutationTimestamp) throws IllegalLogicalTimestampException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.logicalts;

@FunctionalInterface
public interface FrozenTimestampTrackerFactory
{
FrozenTimestampTracker create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.logicalts;

public class IllegalLogicalTimestampException extends Exception
{
public IllegalLogicalTimestampException(String message)
{
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.logicalts;

public class InProgressMutationConflictException extends Exception
{
public InProgressMutationConflictException(String message)
{
super(message);
}
}
Loading