Skip to content

Commit

Permalink
rely on internal ownedValidator to calculate validator indices
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Feb 1, 2022
1 parent 5f89f58 commit 29db0c0
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected AbstractDutyLoader(
public SafeFuture<Optional<S>> loadDutiesForEpoch(final UInt64 epoch) {
LOG.trace("Requesting duties for epoch {}", epoch);
return validatorIndexProvider
.getValidatorIndices(validators.getPublicKeys())
.getValidatorIndices()
.thenCompose(
validatorIndices -> {
if (validatorIndices.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package tech.pegasys.teku.validator.client;

import static java.util.Collections.unmodifiableMap;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;

Expand All @@ -25,6 +24,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.bls.BLSPublicKey;
Expand All @@ -37,7 +38,7 @@ public class ValidatorIndexProvider {

private static final Logger LOG = LogManager.getLogger();
public static final Duration RETRY_DELAY = Duration.ofSeconds(5);
private final OwnedValidators validators;
private final OwnedValidators ownedValidators;
private final ValidatorApiChannel validatorApiChannel;
private final AsyncRunner asyncRunner;
private final Map<BLSPublicKey, Integer> validatorIndexesByPublicKey = new ConcurrentHashMap<>();
Expand All @@ -46,10 +47,10 @@ public class ValidatorIndexProvider {
private final SafeFuture<Void> firstSuccessfulRequest = new SafeFuture<>();

public ValidatorIndexProvider(
final OwnedValidators validators,
final OwnedValidators ownedValidators,
final ValidatorApiChannel validatorApiChannel,
final AsyncRunner asyncRunner) {
this.validators = validators;
this.ownedValidators = ownedValidators;
this.validatorApiChannel = validatorApiChannel;
this.asyncRunner = asyncRunner;
}
Expand Down Expand Up @@ -82,7 +83,7 @@ public void lookupValidators() {
}

private Collection<BLSPublicKey> getUnknownValidators() {
return Sets.difference(validators.getPublicKeys(), validatorIndexesByPublicKey.keySet());
return Sets.difference(ownedValidators.getPublicKeys(), validatorIndexesByPublicKey.keySet());
}

private void logNewValidatorIndices(final Map<BLSPublicKey, Integer> knownValidators) {
Expand All @@ -100,15 +101,23 @@ public Optional<Integer> getValidatorIndex(final BLSPublicKey publicKey) {
return Optional.ofNullable(validatorIndexesByPublicKey.get(publicKey));
}

public SafeFuture<Collection<Integer>> getValidatorIndices(
final Collection<BLSPublicKey> publicKeys) {
public SafeFuture<Collection<Integer>> getValidatorIndices() {
// Wait for at least one successful load of validator indices before attempting to read
return firstSuccessfulRequest.thenApply(
__ ->
publicKeys.stream().flatMap(key -> getValidatorIndex(key).stream()).collect(toList()));
ownedValidators.getActiveValidators().stream()
.flatMap(validator -> getValidatorIndex(validator.getPublicKey()).stream())
.collect(toList()));
}

public SafeFuture<Map<BLSPublicKey, Integer>> getValidatorIndexesByPublicKey() {
return firstSuccessfulRequest.thenApply(__ -> unmodifiableMap(validatorIndexesByPublicKey));
// Wait for at least one successful load of validator indices before attempting to read
return firstSuccessfulRequest.thenApply(
__ ->
ownedValidators.getActiveValidators().stream()
.map(Validator::getPublicKey)
.collect(
Collectors.<BLSPublicKey, BLSPublicKey, Integer>toMap(
Function.identity(), validatorIndexesByPublicKey::get)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class AbstractDutySchedulerTest {

@BeforeEach
public void setUp() {
when(validatorIndexProvider.getValidatorIndices(VALIDATOR_KEYS))
when(validatorIndexProvider.getValidatorIndices())
.thenReturn(SafeFuture.completedFuture(VALIDATOR_INDICES));
final SafeFuture<BLSSignature> rejectAggregationSignature =
SafeFuture.failedFuture(new UnsupportedOperationException("This test ignores aggregation"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class AttestationDutyLoaderTest {

@BeforeEach
void setUp() {
when(validatorIndexProvider.getValidatorIndices(any()))
when(validatorIndexProvider.getValidatorIndices())
.thenReturn(SafeFuture.completedFuture(VALIDATOR_INDICES));
when(forkProvider.getForkInfo(any())).thenReturn(SafeFuture.completedFuture(forkInfo));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class SyncCommitteeDutyLoaderTest {

@BeforeEach
void setUp() {
when(validatorIndexProvider.getValidatorIndices(validators.getPublicKeys()))
when(validatorIndexProvider.getValidatorIndices())
.thenReturn(SafeFuture.completedFuture(validatorIndices));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void shouldWaitForFirstSuccessfulRequestBeforeLookingUpValidatorIndices() {
final SafeFuture<Map<BLSPublicKey, Integer>> requestResult = new SafeFuture<>();
when(validatorApiChannel.getValidatorIndices(Set.of(key1))).thenReturn(requestResult);

final SafeFuture<Collection<Integer>> result = provider.getValidatorIndices(List.of(key1));
final SafeFuture<Collection<Integer>> result = provider.getValidatorIndices();
assertThat(result).isNotDone();

provider.lookupValidators();
Expand Down

0 comments on commit 29db0c0

Please sign in to comment.