From 39be076c594ebc64ff84c34c0376368c2265487e Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 31 Jan 2022 10:51:46 +0100 Subject: [PATCH] better concurrency, simplification of ProposerConfigLoader to be URL only --- .../client/BeaconProposerPreparer.java | 12 ++-- .../AbstractProposerConfigProvider.java | 62 ++++++++++--------- .../FileProposerConfigProvider.java | 37 ----------- .../ProposerConfigProvider.java | 14 +++-- .../loader/ProposerConfigLoader.java | 12 +--- .../ProposerConfigProviderTest.java | 31 ++++++---- .../loader/ProposerConfigLoaderTest.java | 8 --- 7 files changed, 67 insertions(+), 109 deletions(-) delete mode 100644 validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/FileProposerConfigProvider.java diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java index 6290fe2ca5b..0802fe3e329 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java @@ -67,21 +67,19 @@ public void onSlot(UInt64 slot) { validatorIndexProvider .getValidatorIndexesByPublicKey() - .thenApply( - blsPublicKeyIntegerMap -> + .thenCompose( + publicKeyToIndex -> proposerConfigFuture .thenApply( proposerConfig -> - buildBeaconPreparableProposerList( - proposerConfig, blsPublicKeyIntegerMap)) + buildBeaconPreparableProposerList(proposerConfig, publicKeyToIndex)) .exceptionally( throwable -> { LOG.warn( "An error occurred while obtaining proposer config", throwable); return buildBeaconPreparableProposerList( - Optional.empty(), blsPublicKeyIntegerMap); - }) - .join()) + Optional.empty(), publicKeyToIndex); + })) .thenAccept(validatorApiChannel::prepareBeaconProposer) .finish(VALIDATOR_LOGGER::beaconProposerPreparationFailed); } diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/AbstractProposerConfigProvider.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/AbstractProposerConfigProvider.java index fff3663aca7..0a92374b516 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/AbstractProposerConfigProvider.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/AbstractProposerConfigProvider.java @@ -15,7 +15,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.async.AsyncRunner; @@ -30,7 +29,7 @@ public abstract class AbstractProposerConfigProvider implements ProposerConfigPr private final AsyncRunner asyncRunner; protected final ProposerConfigLoader proposerConfigLoader; private Optional lastProposerConfig = Optional.empty(); - private final AtomicBoolean requestInProgress = new AtomicBoolean(false); + private Optional>> futureProposerConfig = Optional.empty(); AbstractProposerConfigProvider( final AsyncRunner asyncRunner, @@ -42,42 +41,47 @@ public abstract class AbstractProposerConfigProvider implements ProposerConfigPr } @Override - public SafeFuture> getProposerConfig() { + public synchronized SafeFuture> getProposerConfig() { if (lastProposerConfig.isPresent() && !refresh) { return SafeFuture.completedFuture(lastProposerConfig); } - if (!requestInProgress.compareAndSet(false, true)) { + if (futureProposerConfig.isPresent()) { if (lastProposerConfig.isPresent()) { LOG.warn("A proposer config load is in progress, providing last loaded config"); return SafeFuture.completedFuture(lastProposerConfig); } - return SafeFuture.failedFuture( - new RuntimeException( - "A proposer config load is in progress and there is no previously loaded config")); + return futureProposerConfig.get(); } - - return asyncRunner - .runAsync( - () -> { - lastProposerConfig = Optional.of(internalGetProposerConfig()); - return lastProposerConfig; - }) - .orTimeout(30, TimeUnit.SECONDS) - .exceptionally( - throwable -> { - if (lastProposerConfig.isPresent()) { - LOG.warn( - "An error occurred while obtaining config, providing last loaded config", - throwable); - return lastProposerConfig; - } - throw new RuntimeException( - "An error occurred while obtaining config and there is no previously loaded config", - throwable); - }) - .thenPeek(__ -> LOG.info("proposer config successfully loaded")) - .alwaysRun(() -> requestInProgress.set(false)); + futureProposerConfig = + Optional.of( + asyncRunner + .runAsync( + () -> { + lastProposerConfig = Optional.of(internalGetProposerConfig()); + return lastProposerConfig; + }) + .orTimeout(30, TimeUnit.SECONDS) + .exceptionally( + throwable -> { + if (lastProposerConfig.isPresent()) { + LOG.warn( + "An error occurred while obtaining config, providing last loaded config", + throwable); + return lastProposerConfig; + } + throw new RuntimeException( + "An error occurred while obtaining config and there is no previously loaded config", + throwable); + }) + .thenPeek(__ -> LOG.info("proposer config successfully loaded")) + .alwaysRun( + () -> { + synchronized (this) { + futureProposerConfig = Optional.empty(); + } + })); + return futureProposerConfig.get(); } protected abstract ProposerConfig internalGetProposerConfig(); diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/FileProposerConfigProvider.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/FileProposerConfigProvider.java deleted file mode 100644 index 4a22fcf7011..00000000000 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/FileProposerConfigProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2022 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.validator.client.proposerconfig; - -import java.io.File; -import tech.pegasys.teku.infrastructure.async.AsyncRunner; -import tech.pegasys.teku.validator.client.ProposerConfig; -import tech.pegasys.teku.validator.client.proposerconfig.loader.ProposerConfigLoader; - -public class FileProposerConfigProvider extends AbstractProposerConfigProvider { - private final File source; - - FileProposerConfigProvider( - final AsyncRunner asyncRunner, - final boolean refresh, - final ProposerConfigLoader proposerConfigLoader, - final File source) { - super(asyncRunner, refresh, proposerConfigLoader); - this.source = source; - } - - @Override - protected ProposerConfig internalGetProposerConfig() { - return proposerConfigLoader.getProposerConfig(source); - } -} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProvider.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProvider.java index cdc1cdfa715..6c82a5d2534 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProvider.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProvider.java @@ -32,14 +32,18 @@ static ProposerConfigProvider create( final Optional source) { if (source.isPresent()) { + URL sourceUrl; try { - URL sourceUrl = new URL(source.get()); + sourceUrl = new URL(source.get()); return new UrlProposerConfigProvider(asyncRunner, refresh, proposerConfigLoader, sourceUrl); - } catch (MalformedURLException e) { - File sourceFile = new File(source.get()); - return new FileProposerConfigProvider( - asyncRunner, refresh, proposerConfigLoader, sourceFile); + } catch (MalformedURLException e1) { + try { + sourceUrl = new File(source.get()).toURI().toURL(); + } catch (MalformedURLException e2) { + throw new RuntimeException("Unable to translate file to URL", e2); + } } + return new UrlProposerConfigProvider(asyncRunner, refresh, proposerConfigLoader, sourceUrl); } return NOOP; } diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoader.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoader.java index b50b460c286..54ca67f4202 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoader.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoader.java @@ -14,7 +14,6 @@ package tech.pegasys.teku.validator.client.proposerconfig.loader; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.File; import java.io.IOException; import java.net.URL; import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException; @@ -33,22 +32,13 @@ public ProposerConfigLoader(final ObjectMapper objectMapper) { this.objectMapper = objectMapper; } - public ProposerConfig getProposerConfig(final File source) { - try { - return objectMapper.readValue(source, ProposerConfig.class); - } catch (IOException ex) { - throw new InvalidConfigurationException( - "Failed to load proposer config from file: " + source, ex); - } - } - public ProposerConfig getProposerConfig(final URL source) { try { return objectMapper.readValue(source, ProposerConfig.class); } catch (IOException ex) { throw new InvalidConfigurationException( - "Failed to load proposer config from URL: " + "Failed to load proposer config from: " + UrlSanitizer.sanitizePotentialUrl(source.toString()), ex); } diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProviderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProviderTest.java index dc7db7965d6..41b612c032f 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProviderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/ProposerConfigProviderTest.java @@ -19,6 +19,8 @@ import com.google.common.collect.ImmutableMap; import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Optional; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -44,7 +46,11 @@ public class ProposerConfigProviderTest { private final ProposerConfigProvider proposerConfigProvider = ProposerConfigProvider.create(asyncRunner, true, proposerConfigLoader, Optional.of(SOURCE)); - private final File sourceFile = new File(SOURCE); + private final URL sourceUrl; + + public ProposerConfigProviderTest() throws MalformedURLException { + sourceUrl = new File(SOURCE).toURI().toURL(); + } @Test void getProposerConfig_shouldReturnConfig() { @@ -53,7 +59,7 @@ void getProposerConfig_shouldReturnConfig() { assertThat(futureMaybeConfig).isNotCompleted(); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA); asyncRunner.executeQueuedActions(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA)); @@ -64,7 +70,7 @@ void getProposerConfig_onErrorShouldThrowWhenNoLastConfigAvailable() { SafeFuture> futureMaybeConfig = proposerConfigProvider.getProposerConfig(); - when(proposerConfigLoader.getProposerConfig(sourceFile)) + when(proposerConfigLoader.getProposerConfig(sourceUrl)) .thenThrow(new RuntimeException("error")); asyncRunner.executeQueuedActions(); @@ -76,14 +82,14 @@ void getProposerConfig_onErrorShouldReturnLastConfigWhenLastConfigAvailable() { SafeFuture> futureMaybeConfig = proposerConfigProvider.getProposerConfig(); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA); asyncRunner.executeQueuedActions(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA)); futureMaybeConfig = proposerConfigProvider.getProposerConfig(); - when(proposerConfigLoader.getProposerConfig(sourceFile)) + when(proposerConfigLoader.getProposerConfig(sourceUrl)) .thenThrow(new RuntimeException("error")); asyncRunner.executeQueuedActions(); @@ -91,15 +97,16 @@ void getProposerConfig_onErrorShouldReturnLastConfigWhenLastConfigAvailable() { } @Test - void getProposerConfig_onConcurrentCallsShouldThrowWhenNoLastConfigAvailable() { + void getProposerConfig_onConcurrentCallsShouldMergeFuturesWhenNoLastConfigAvailable() { SafeFuture> futureMaybeConfig = proposerConfigProvider.getProposerConfig(); SafeFuture> futureMaybeConfig2 = proposerConfigProvider.getProposerConfig(); - assertThat(futureMaybeConfig2).isCompletedExceptionally(); + assertThat(futureMaybeConfig2).isEqualTo(futureMaybeConfig); + assertThat(futureMaybeConfig2).isNotCompleted(); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA); asyncRunner.executeQueuedActions(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA)); @@ -110,7 +117,7 @@ void getProposerConfig_onConcurrentCallsShouldReturnLastConfigWhenLastConfigAvai SafeFuture> futureMaybeConfig = proposerConfigProvider.getProposerConfig(); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA); asyncRunner.executeQueuedActions(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA)); @@ -121,7 +128,7 @@ void getProposerConfig_onConcurrentCallsShouldReturnLastConfigWhenLastConfigAvai assertThat(futureMaybeConfig2).isCompletedWithValue(Optional.of(proposerConfigA)); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigB); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigB); asyncRunner.executeQueuedActions(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigB)); @@ -138,12 +145,12 @@ void getProposerConfig_shouldAlwaysReturnFirstValidConfigWhenRefreshIsFalse() { assertThat(futureMaybeConfig).isNotCompleted(); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigA); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigA); asyncRunner.executeQueuedActions(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA)); - when(proposerConfigLoader.getProposerConfig(sourceFile)).thenReturn(proposerConfigB); + when(proposerConfigLoader.getProposerConfig(sourceUrl)).thenReturn(proposerConfigB); futureMaybeConfig = proposerConfigProvider.getProposerConfig(); assertThat(futureMaybeConfig).isCompletedWithValue(Optional.of(proposerConfigA)); diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoaderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoaderTest.java index 0db33ecefa9..07a241229a0 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoaderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/proposerconfig/loader/ProposerConfigLoaderTest.java @@ -17,7 +17,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.io.Resources; -import java.io.File; import java.net.URL; import java.util.Optional; import org.junit.jupiter.api.Test; @@ -35,13 +34,6 @@ void shouldLoadValidConfigFromUrl() { validateContent1(loader.getProposerConfig(resource)); } - @Test - void shouldLoadValidConfigFromFile() { - final URL resource = Resources.getResource("proposerConfigValid1.json"); - - validateContent1(loader.getProposerConfig(new File(resource.getPath()))); - } - @Test void shouldLoadConfigWithEmptyProposerConfig() { final URL resource = Resources.getResource("proposerConfigValid2.json");