From 29db0c0ef8bb5f33bf1dac088e2b74bc8ab312be Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 31 Jan 2022 21:38:47 +0100 Subject: [PATCH] rely on internal ownedValidator to calculate validator indices --- .../validator/client/AbstractDutyLoader.java | 2 +- .../client/ValidatorIndexProvider.java | 27 ++++++++++++------- .../client/AbstractDutySchedulerTest.java | 2 +- .../client/AttestationDutyLoaderTest.java | 2 +- .../client/SyncCommitteeDutyLoaderTest.java | 2 +- .../client/ValidatorIndexProviderTest.java | 2 +- 6 files changed, 23 insertions(+), 14 deletions(-) diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java index 15dd5180246..100365bfebf 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java @@ -38,7 +38,7 @@ protected AbstractDutyLoader( public SafeFuture> loadDutiesForEpoch(final UInt64 epoch) { LOG.trace("Requesting duties for epoch {}", epoch); return validatorIndexProvider - .getValidatorIndices(validators.getPublicKeys()) + .getValidatorIndices() .thenCompose( validatorIndices -> { if (validatorIndices.isEmpty()) { diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorIndexProvider.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorIndexProvider.java index 8f7371e1f69..0b8779f2db2 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorIndexProvider.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorIndexProvider.java @@ -13,7 +13,6 @@ package tech.pegasys.teku.validator.client; -import static java.util.Collections.unmodifiableMap; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; @@ -25,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.bls.BLSPublicKey; @@ -37,7 +38,7 @@ public class ValidatorIndexProvider { private static final Logger LOG = LogManager.getLogger(); public static final Duration RETRY_DELAY = Duration.ofSeconds(5); - private final OwnedValidators validators; + private final OwnedValidators ownedValidators; private final ValidatorApiChannel validatorApiChannel; private final AsyncRunner asyncRunner; private final Map validatorIndexesByPublicKey = new ConcurrentHashMap<>(); @@ -46,10 +47,10 @@ public class ValidatorIndexProvider { private final SafeFuture firstSuccessfulRequest = new SafeFuture<>(); public ValidatorIndexProvider( - final OwnedValidators validators, + final OwnedValidators ownedValidators, final ValidatorApiChannel validatorApiChannel, final AsyncRunner asyncRunner) { - this.validators = validators; + this.ownedValidators = ownedValidators; this.validatorApiChannel = validatorApiChannel; this.asyncRunner = asyncRunner; } @@ -82,7 +83,7 @@ public void lookupValidators() { } private Collection getUnknownValidators() { - return Sets.difference(validators.getPublicKeys(), validatorIndexesByPublicKey.keySet()); + return Sets.difference(ownedValidators.getPublicKeys(), validatorIndexesByPublicKey.keySet()); } private void logNewValidatorIndices(final Map knownValidators) { @@ -100,15 +101,23 @@ public Optional getValidatorIndex(final BLSPublicKey publicKey) { return Optional.ofNullable(validatorIndexesByPublicKey.get(publicKey)); } - public SafeFuture> getValidatorIndices( - final Collection publicKeys) { + public SafeFuture> getValidatorIndices() { // Wait for at least one successful load of validator indices before attempting to read return firstSuccessfulRequest.thenApply( __ -> - publicKeys.stream().flatMap(key -> getValidatorIndex(key).stream()).collect(toList())); + ownedValidators.getActiveValidators().stream() + .flatMap(validator -> getValidatorIndex(validator.getPublicKey()).stream()) + .collect(toList())); } public SafeFuture> getValidatorIndexesByPublicKey() { - return firstSuccessfulRequest.thenApply(__ -> unmodifiableMap(validatorIndexesByPublicKey)); + // Wait for at least one successful load of validator indices before attempting to read + return firstSuccessfulRequest.thenApply( + __ -> + ownedValidators.getActiveValidators().stream() + .map(Validator::getPublicKey) + .collect( + Collectors.toMap( + Function.identity(), validatorIndexesByPublicKey::get))); } } diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AbstractDutySchedulerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AbstractDutySchedulerTest.java index 31ebaab0027..c2720bf02f9 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AbstractDutySchedulerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AbstractDutySchedulerTest.java @@ -56,7 +56,7 @@ public abstract class AbstractDutySchedulerTest { @BeforeEach public void setUp() { - when(validatorIndexProvider.getValidatorIndices(VALIDATOR_KEYS)) + when(validatorIndexProvider.getValidatorIndices()) .thenReturn(SafeFuture.completedFuture(VALIDATOR_INDICES)); final SafeFuture rejectAggregationSignature = SafeFuture.failedFuture(new UnsupportedOperationException("This test ignores aggregation")); diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java index ff46a0a6ff9..4a12b8501b4 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java @@ -79,7 +79,7 @@ class AttestationDutyLoaderTest { @BeforeEach void setUp() { - when(validatorIndexProvider.getValidatorIndices(any())) + when(validatorIndexProvider.getValidatorIndices()) .thenReturn(SafeFuture.completedFuture(VALIDATOR_INDICES)); when(forkProvider.getForkInfo(any())).thenReturn(SafeFuture.completedFuture(forkInfo)); } diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/SyncCommitteeDutyLoaderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/SyncCommitteeDutyLoaderTest.java index 6dc6efc49fb..9e1119fd714 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/SyncCommitteeDutyLoaderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/SyncCommitteeDutyLoaderTest.java @@ -69,7 +69,7 @@ class SyncCommitteeDutyLoaderTest { @BeforeEach void setUp() { - when(validatorIndexProvider.getValidatorIndices(validators.getPublicKeys())) + when(validatorIndexProvider.getValidatorIndices()) .thenReturn(SafeFuture.completedFuture(validatorIndices)); } diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/ValidatorIndexProviderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/ValidatorIndexProviderTest.java index fd7e83d8f3b..ee53a4363d7 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/ValidatorIndexProviderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/ValidatorIndexProviderTest.java @@ -161,7 +161,7 @@ void shouldWaitForFirstSuccessfulRequestBeforeLookingUpValidatorIndices() { final SafeFuture> requestResult = new SafeFuture<>(); when(validatorApiChannel.getValidatorIndices(Set.of(key1))).thenReturn(requestResult); - final SafeFuture> result = provider.getValidatorIndices(List.of(key1)); + final SafeFuture> result = provider.getValidatorIndices(); assertThat(result).isNotDone(); provider.lookupValidators();