Skip to content

Commit

Permalink
better concurrency, simplification of ProposerConfigLoader to be URL …
Browse files Browse the repository at this point in the history
…only
  • Loading branch information
tbenr committed Jan 31, 2022
1 parent f766d66 commit 39be076
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,19 @@ public void onSlot(UInt64 slot) {

validatorIndexProvider
.getValidatorIndexesByPublicKey()
.thenApply(
blsPublicKeyIntegerMap ->
.thenCompose(
publicKeyToIndex ->
proposerConfigFuture
.thenApply(
proposerConfig ->
buildBeaconPreparableProposerList(
proposerConfig, blsPublicKeyIntegerMap))
buildBeaconPreparableProposerList(proposerConfig, publicKeyToIndex))
.exceptionally(
throwable -> {
LOG.warn(
"An error occurred while obtaining proposer config", throwable);
return buildBeaconPreparableProposerList(
Optional.empty(), blsPublicKeyIntegerMap);
})
.join())
Optional.empty(), publicKeyToIndex);
}))
.thenAccept(validatorApiChannel::prepareBeaconProposer)
.finish(VALIDATOR_LOGGER::beaconProposerPreparationFailed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
Expand All @@ -30,7 +29,7 @@ public abstract class AbstractProposerConfigProvider implements ProposerConfigPr
private final AsyncRunner asyncRunner;
protected final ProposerConfigLoader proposerConfigLoader;
private Optional<ProposerConfig> lastProposerConfig = Optional.empty();
private final AtomicBoolean requestInProgress = new AtomicBoolean(false);
private Optional<SafeFuture<Optional<ProposerConfig>>> futureProposerConfig = Optional.empty();

AbstractProposerConfigProvider(
final AsyncRunner asyncRunner,
Expand All @@ -42,42 +41,47 @@ public abstract class AbstractProposerConfigProvider implements ProposerConfigPr
}

@Override
public SafeFuture<Optional<ProposerConfig>> getProposerConfig() {
public synchronized SafeFuture<Optional<ProposerConfig>> getProposerConfig() {
if (lastProposerConfig.isPresent() && !refresh) {
return SafeFuture.completedFuture(lastProposerConfig);
}

if (!requestInProgress.compareAndSet(false, true)) {
if (futureProposerConfig.isPresent()) {
if (lastProposerConfig.isPresent()) {
LOG.warn("A proposer config load is in progress, providing last loaded config");
return SafeFuture.completedFuture(lastProposerConfig);
}
return SafeFuture.failedFuture(
new RuntimeException(
"A proposer config load is in progress and there is no previously loaded config"));
return futureProposerConfig.get();
}

return asyncRunner
.runAsync(
() -> {
lastProposerConfig = Optional.of(internalGetProposerConfig());
return lastProposerConfig;
})
.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(
throwable -> {
if (lastProposerConfig.isPresent()) {
LOG.warn(
"An error occurred while obtaining config, providing last loaded config",
throwable);
return lastProposerConfig;
}
throw new RuntimeException(
"An error occurred while obtaining config and there is no previously loaded config",
throwable);
})
.thenPeek(__ -> LOG.info("proposer config successfully loaded"))
.alwaysRun(() -> requestInProgress.set(false));
futureProposerConfig =
Optional.of(
asyncRunner
.runAsync(
() -> {
lastProposerConfig = Optional.of(internalGetProposerConfig());
return lastProposerConfig;
})
.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(
throwable -> {
if (lastProposerConfig.isPresent()) {
LOG.warn(
"An error occurred while obtaining config, providing last loaded config",
throwable);
return lastProposerConfig;
}
throw new RuntimeException(
"An error occurred while obtaining config and there is no previously loaded config",
throwable);
})
.thenPeek(__ -> LOG.info("proposer config successfully loaded"))
.alwaysRun(
() -> {
synchronized (this) {
futureProposerConfig = Optional.empty();
}
}));
return futureProposerConfig.get();
}

protected abstract ProposerConfig internalGetProposerConfig();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ static ProposerConfigProvider create(
final Optional<String> source) {

if (source.isPresent()) {
URL sourceUrl;
try {
URL sourceUrl = new URL(source.get());
sourceUrl = new URL(source.get());
return new UrlProposerConfigProvider(asyncRunner, refresh, proposerConfigLoader, sourceUrl);
} catch (MalformedURLException e) {
File sourceFile = new File(source.get());
return new FileProposerConfigProvider(
asyncRunner, refresh, proposerConfigLoader, sourceFile);
} catch (MalformedURLException e1) {
try {
sourceUrl = new File(source.get()).toURI().toURL();
} catch (MalformedURLException e2) {
throw new RuntimeException("Unable to translate file to URL", e2);
}
}
return new UrlProposerConfigProvider(asyncRunner, refresh, proposerConfigLoader, sourceUrl);
}
return NOOP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package tech.pegasys.teku.validator.client.proposerconfig.loader;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
Expand All @@ -33,22 +32,13 @@ public ProposerConfigLoader(final ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

public ProposerConfig getProposerConfig(final File source) {
try {
return objectMapper.readValue(source, ProposerConfig.class);
} catch (IOException ex) {
throw new InvalidConfigurationException(
"Failed to load proposer config from file: " + source, ex);
}
}

public ProposerConfig getProposerConfig(final URL source) {
try {
return objectMapper.readValue(source, ProposerConfig.class);
} catch (IOException ex) {

throw new InvalidConfigurationException(
"Failed to load proposer config from URL: "
"Failed to load proposer config from: "
+ UrlSanitizer.sanitizePotentialUrl(source.toString()),
ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand All @@ -44,7 +46,11 @@ public class ProposerConfigProviderTest {

private final ProposerConfigProvider proposerConfigProvider =
ProposerConfigProvider.create(asyncRunner, true, proposerConfigLoader, Optional.of(SOURCE));
private final File sourceFile = new File(SOURCE);
private final URL sourceUrl;

public ProposerConfigProviderTest() throws MalformedURLException {
sourceUrl = new File(SOURCE).toURI().toURL();
}

@Test
void getProposerConfig_shouldReturnConfig() {
Expand All @@ -53,7 +59,7 @@ void getProposerConfig_shouldReturnConfig() {

assertThat(futureMaybeConfig).isNotCompleted();

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA);
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));
Expand All @@ -64,7 +70,7 @@ void getProposerConfig_onErrorShouldThrowWhenNoLastConfigAvailable() {
SafeFuture<Optional<ProposerConfig>> futureMaybeConfig =
proposerConfigProvider.getProposerConfig();

when(proposerConfigLoader.getProposerConfig(sourceFile))
when(proposerConfigLoader.getProposerConfig(sourceUrl))
.thenThrow(new RuntimeException("error"));
asyncRunner.executeQueuedActions();

Expand All @@ -76,30 +82,31 @@ void getProposerConfig_onErrorShouldReturnLastConfigWhenLastConfigAvailable() {
SafeFuture<Optional<ProposerConfig>> futureMaybeConfig =
proposerConfigProvider.getProposerConfig();

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA);
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));

futureMaybeConfig = proposerConfigProvider.getProposerConfig();

when(proposerConfigLoader.getProposerConfig(sourceFile))
when(proposerConfigLoader.getProposerConfig(sourceUrl))
.thenThrow(new RuntimeException("error"));
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));
}

@Test
void getProposerConfig_onConcurrentCallsShouldThrowWhenNoLastConfigAvailable() {
void getProposerConfig_onConcurrentCallsShouldMergeFuturesWhenNoLastConfigAvailable() {
SafeFuture<Optional<ProposerConfig>> futureMaybeConfig =
proposerConfigProvider.getProposerConfig();

SafeFuture<Optional<ProposerConfig>> futureMaybeConfig2 =
proposerConfigProvider.getProposerConfig();
assertThat(futureMaybeConfig2).isCompletedExceptionally();
assertThat(futureMaybeConfig2).isEqualTo(futureMaybeConfig);
assertThat(futureMaybeConfig2).isNotCompleted();

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA);
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));
Expand All @@ -110,7 +117,7 @@ void getProposerConfig_onConcurrentCallsShouldReturnLastConfigWhenLastConfigAvai
SafeFuture<Optional<ProposerConfig>> futureMaybeConfig =
proposerConfigProvider.getProposerConfig();

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA);
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));
Expand All @@ -121,7 +128,7 @@ void getProposerConfig_onConcurrentCallsShouldReturnLastConfigWhenLastConfigAvai

assertThat(futureMaybeConfig2).isCompletedWithValue(Optional.of(proposerConfigA));

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigB);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigB);
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigB));
Expand All @@ -138,12 +145,12 @@ void getProposerConfig_shouldAlwaysReturnFirstValidConfigWhenRefreshIsFalse() {

assertThat(futureMaybeConfig).isNotCompleted();

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA);
asyncRunner.executeQueuedActions();

assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));

when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigB);
when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigB);

futureMaybeConfig = proposerConfigProvider.getProposerConfig();
assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.google.common.io.Resources;
import java.io.File;
import java.net.URL;
import java.util.Optional;
import org.junit.jupiter.api.Test;
Expand All @@ -35,13 +34,6 @@ void shouldLoadValidConfigFromUrl() {
validateContent1(loader.getProposerConfig(resource));
}

@Test
void shouldLoadValidConfigFromFile() {
final URL resource = Resources.getResource("proposerConfigValid1.json");

validateContent1(loader.getProposerConfig(new File(resource.getPath())));
}

@Test
void shouldLoadConfigWithEmptyProposerConfig() {
final URL resource = Resources.getResource("proposerConfigValid2.json");
Expand Down

0 comments on commit 39be076

Please sign in to comment.