Skip to content

Commit

Permalink
Refactored concurrent local slashing protection (#7858)
Browse files Browse the repository at this point in the history
* Refactored concurrent local slashing protection

 - broke out to its own structures
 - made implementation clearer
 - consolidated objects to use a single map and lock

 This removes the need for several concurrent maps through abstraction, makes it easier to understand the flow, and removes a lot of the complicated flows in the initial implementation.

 The configuration is also simpler, with instantiation of the object taking place in the client service, clearly separating the 2 implementations.

 Default is still to use the old functionality for now.

Signed-off-by: Paul Harris <[email protected]>
  • Loading branch information
rolfyone authored Jan 8, 2024
1 parent b2756d2 commit 54326de
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 222 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.signatures;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.ethereum.signingrecord.ValidatorSigningRecord;
import tech.pegasys.teku.infrastructure.io.SyncDataAccessor;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

// Intended to use only in `LocalSlashingProtectorConcurrentAccess`
class LocalSlashingProtectionRecord {
private final Path slashingProtectedPath;
// In the same way as the MAP in LocalSlashingProtector, signingRecord gets maintained over time
private ValidatorSigningRecord signingRecord;

private final ReentrantLock lock;

LocalSlashingProtectionRecord(
final Path slashingProtectedPath,
final ValidatorSigningRecord signingRecord,
final ReentrantLock lock) {
this.slashingProtectedPath = slashingProtectedPath;
this.signingRecord = signingRecord;
this.lock = lock;
}

@VisibleForTesting
ReentrantLock getLock() {
return lock;
}

void lock() {
lock.lock();
}

void unlock() {
lock.unlock();
}

ValidatorSigningRecord getSigningRecord() {
return signingRecord;
}

boolean writeSigningRecord(
final SyncDataAccessor dataAccessor, final Optional<ValidatorSigningRecord> maybeRecord)
throws IOException {
if (maybeRecord.isEmpty()) {
return false;
}
dataAccessor.syncedWrite(slashingProtectedPath, maybeRecord.get().toBytes());
this.signingRecord = maybeRecord.get();
return true;
}

Optional<ValidatorSigningRecord> maySignBlock(Bytes32 genesisValidatorsRoot, UInt64 slot) {
return signingRecord.maySignBlock(genesisValidatorsRoot, slot);
}

Optional<ValidatorSigningRecord> maySignAttestation(
Bytes32 genesisValidatorsRoot, UInt64 sourceEpoch, UInt64 targetEpoch) {
return signingRecord.maySignAttestation(genesisValidatorsRoot, sourceEpoch, targetEpoch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@

package tech.pegasys.teku.spec.signatures;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.ethereum.signingrecord.ValidatorSigningRecord;
Expand All @@ -31,66 +27,19 @@

public class LocalSlashingProtector implements SlashingProtector {

private static final Logger LOG = LogManager.getLogger();

private final Map<BLSPublicKey, ValidatorSigningRecord> signingRecords =
new ConcurrentHashMap<>();
private final Map<BLSPublicKey, Path> slashingProtectionPath = new ConcurrentHashMap<>();
private final Map<BLSPublicKey, ReentrantLock> validatorLocks = new ConcurrentHashMap<>();

private final Map<BLSPublicKey, ValidatorSigningRecord> signingRecords = new HashMap<>();
private final Map<BLSPublicKey, Path> slashingProtectionPath = new HashMap<>();
private final SyncDataAccessor dataAccessor;
private final Path slashingProtectionBaseDir;

private final boolean localSlashingProtectionSynchronizedModeEnabled;

public LocalSlashingProtector(
final SyncDataAccessor dataAccessor,
final Path slashingProtectionBaseDir,
boolean localSlashingProtectionSynchronizedModeEnabled) {
final SyncDataAccessor dataAccessor, final Path slashingProtectionBaseDir) {
this.dataAccessor = dataAccessor;
this.slashingProtectionBaseDir = slashingProtectionBaseDir;
this.localSlashingProtectionSynchronizedModeEnabled =
localSlashingProtectionSynchronizedModeEnabled;
if (!localSlashingProtectionSynchronizedModeEnabled) {
LOG.info("Local slashing protection running with local locks enabled");
}
}

@Override
public SafeFuture<Boolean> maySignBlock(
final BLSPublicKey validator, final Bytes32 genesisValidatorsRoot, final UInt64 slot) {
return localSlashingProtectionSynchronizedModeEnabled
? maySignBlockSynchronized(validator, genesisValidatorsRoot, slot)
: maySignBlockWithLocking(validator, genesisValidatorsRoot, slot);
}

@Override
public SafeFuture<Boolean> maySignAttestation(
final BLSPublicKey validator,
final Bytes32 genesisValidatorsRoot,
final UInt64 sourceEpoch,
final UInt64 targetEpoch) {
return localSlashingProtectionSynchronizedModeEnabled
? maySignAttestationSynchronized(validator, genesisValidatorsRoot, sourceEpoch, targetEpoch)
: maySignAttestationWithLocking(validator, genesisValidatorsRoot, sourceEpoch, targetEpoch);
}

private SafeFuture<Boolean> maySignBlockWithLocking(
final BLSPublicKey validator, final Bytes32 genesisValidatorsRoot, final UInt64 slot) {
return SafeFuture.of(
() -> {
final ReentrantLock lock = acquireLock(validator);
try {
final ValidatorSigningRecord signingRecord =
loadOrCreateSigningRecord(validator, genesisValidatorsRoot);
return handleResult(validator, signingRecord.maySignBlock(genesisValidatorsRoot, slot));
} finally {
lock.unlock();
}
});
}

private synchronized SafeFuture<Boolean> maySignBlockSynchronized(
public synchronized SafeFuture<Boolean> maySignBlock(
final BLSPublicKey validator, final Bytes32 genesisValidatorsRoot, final UInt64 slot) {
return SafeFuture.of(
() -> {
Expand All @@ -100,7 +49,8 @@ private synchronized SafeFuture<Boolean> maySignBlockSynchronized(
});
}

private synchronized SafeFuture<Boolean> maySignAttestationSynchronized(
@Override
public synchronized SafeFuture<Boolean> maySignAttestation(
final BLSPublicKey validator,
final Bytes32 genesisValidatorsRoot,
final UInt64 sourceEpoch,
Expand All @@ -115,26 +65,6 @@ private synchronized SafeFuture<Boolean> maySignAttestationSynchronized(
});
}

private SafeFuture<Boolean> maySignAttestationWithLocking(
final BLSPublicKey validator,
final Bytes32 genesisValidatorsRoot,
final UInt64 sourceEpoch,
final UInt64 targetEpoch) {
return SafeFuture.of(
() -> {
final ReentrantLock lock = acquireLock(validator);
try {
final ValidatorSigningRecord signingRecord =
loadOrCreateSigningRecord(validator, genesisValidatorsRoot);
return handleResult(
validator,
signingRecord.maySignAttestation(genesisValidatorsRoot, sourceEpoch, targetEpoch));
} finally {
lock.unlock();
}
});
}

private Boolean handleResult(
final BLSPublicKey validator, final Optional<ValidatorSigningRecord> newRecord)
throws IOException {
Expand Down Expand Up @@ -170,17 +100,6 @@ private ValidatorSigningRecord loadOrCreateSigningRecord(
});
}

ReentrantLock acquireLock(final BLSPublicKey validator) {
final ReentrantLock lock = validatorLocks.computeIfAbsent(validator, __ -> new ReentrantLock());
lock.lock();
return lock;
}

@VisibleForTesting
ReentrantLock getLock(final BLSPublicKey validator) {
return validatorLocks.computeIfAbsent(validator, __ -> new ReentrantLock());
}

private void writeSigningRecord(final BLSPublicKey validator, final ValidatorSigningRecord record)
throws IOException {
dataAccessor.syncedWrite(validatorRecordPath(validator), record.toBytes());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.signatures;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.ethereum.signingrecord.ValidatorSigningRecord;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.io.SyncDataAccessor;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class LocalSlashingProtectorConcurrentAccess implements SlashingProtector {
private static final Logger LOG = LogManager.getLogger();
private final Map<BLSPublicKey, LocalSlashingProtectionRecord> records =
new ConcurrentHashMap<>();
private final SyncDataAccessor dataAccessor;
private final Path slashingProtectionBaseDir;

public LocalSlashingProtectorConcurrentAccess(
final SyncDataAccessor dataAccessor, final Path slashingProtectionBaseDir) {
this.dataAccessor = dataAccessor;
this.slashingProtectionBaseDir = slashingProtectionBaseDir;
}

@Override
public SafeFuture<Boolean> maySignBlock(
final BLSPublicKey validator, final Bytes32 genesisValidatorsRoot, final UInt64 slot) {
return SafeFuture.of(
() -> {
final LocalSlashingProtectionRecord record =
getOrCreateSigningRecord(validator, genesisValidatorsRoot);
record.lock();
try {
return record.writeSigningRecord(
dataAccessor, record.maySignBlock(genesisValidatorsRoot, slot));
} finally {
record.unlock();
}
});
}

@Override
public SafeFuture<Boolean> maySignAttestation(
final BLSPublicKey validator,
final Bytes32 genesisValidatorsRoot,
final UInt64 sourceEpoch,
final UInt64 targetEpoch) {
return SafeFuture.of(
() -> {
final LocalSlashingProtectionRecord record =
getOrCreateSigningRecord(validator, genesisValidatorsRoot);
record.lock();
try {
return record.writeSigningRecord(
dataAccessor,
record.maySignAttestation(genesisValidatorsRoot, sourceEpoch, targetEpoch));
} finally {
record.unlock();
}
});
}

@Override
public Optional<ValidatorSigningRecord> getSigningRecord(final BLSPublicKey validator) {
final Optional<LocalSlashingProtectionRecord> maybeRecord =
Optional.ofNullable(records.get(validator));
if (maybeRecord.isEmpty()) {
// not loaded yet, just get from file if available, can create structure on use.
return getValidatorSigningRecordFromFile(validator);
}
final LocalSlashingProtectionRecord record = maybeRecord.get();
try {
record.lock();
return Optional.of(record.getSigningRecord());
} finally {
record.unlock();
}
}

@VisibleForTesting
LocalSlashingProtectionRecord getOrCreateSigningRecord(
final BLSPublicKey validator, final Bytes32 genesisValidatorsRoot) {
return records.computeIfAbsent(validator, __ -> addRecord(validator, genesisValidatorsRoot));
}

private LocalSlashingProtectionRecord addRecord(
final BLSPublicKey publicKey, final Bytes32 genesisValidatorsRoot) {
final Path slashingProtectedPath =
slashingProtectionBaseDir.resolve(
publicKey.toBytesCompressed().toUnprefixedHexString() + ".yml");
final Optional<ValidatorSigningRecord> maybeRecord =
getValidatorSigningRecordFromFile(publicKey);
return new LocalSlashingProtectionRecord(
slashingProtectedPath,
maybeRecord.orElse(new ValidatorSigningRecord(genesisValidatorsRoot)),
new ReentrantLock());
}

private Optional<ValidatorSigningRecord> getValidatorSigningRecordFromFile(
final BLSPublicKey publicKey) {

final Path slashingProtectedPath =
slashingProtectionBaseDir.resolve(
publicKey.toBytesCompressed().toUnprefixedHexString() + ".yml");
try {
return dataAccessor.read(slashingProtectedPath).map(ValidatorSigningRecord::fromBytes);
} catch (IOException e) {
LOG.error("Failed to load validator signing record {}", publicKey, e);
return Optional.empty();
}
}
}
Loading

0 comments on commit 54326de

Please sign in to comment.