diff --git a/CHANGELOG.md b/CHANGELOG.md index 50b2f027a38..75b4f2ac82b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ For information on changes in released versions of Teku, see the [releases page] - The `voluntary-exit` subcommand can restrict the exit to a specific list of validators public keys using the option `--validator-public-keys`. Example: `teku voluntary-exit --beacon-node-api-endpoint=[,...]... --data-validator-path= --include-keymanager-keys= --validator-keys=: | : --validator-public-keys=[,...]...` To include validator keys managed via keymanager APIs, the option `--include-keymanager-keys` could be set to `true` (The default value is set to `false`) +- Throttle signing of validator registrations when using an external signer ### Bug Fixes - Filter out unknown validators when sending validator registrations to the builder network diff --git a/beacon/pow/src/main/java/tech/pegasys/teku/beacon/pow/ThrottlingEth1Provider.java b/beacon/pow/src/main/java/tech/pegasys/teku/beacon/pow/ThrottlingEth1Provider.java index 025cffdba67..d3103b8c24a 100644 --- a/beacon/pow/src/main/java/tech/pegasys/teku/beacon/pow/ThrottlingEth1Provider.java +++ b/beacon/pow/src/main/java/tech/pegasys/teku/beacon/pow/ThrottlingEth1Provider.java @@ -38,7 +38,7 @@ public ThrottlingEth1Provider( final MetricsSystem metricsSystem) { this.delegate = delegate; taskQueue = - new ThrottlingTaskQueue( + ThrottlingTaskQueue.create( maximumConcurrentRequests, metricsSystem, TekuMetricCategory.BEACON, diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingBuilderClient.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingBuilderClient.java index 709e7c98cbc..a60946fd91e 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingBuilderClient.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingBuilderClient.java @@ -38,7 +38,7 @@ public ThrottlingBuilderClient( final MetricsSystem metricsSystem) { this.delegate = delegate; taskQueue = - new ThrottlingTaskQueue( + ThrottlingTaskQueue.create( maximumConcurrentRequests, metricsSystem, TekuMetricCategory.BEACON, diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingExecutionEngineClient.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingExecutionEngineClient.java index c1a40f454e0..2269e0e2eaa 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingExecutionEngineClient.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/ThrottlingExecutionEngineClient.java @@ -39,7 +39,7 @@ public ThrottlingExecutionEngineClient( final MetricsSystem metricsSystem) { this.delegate = delegate; taskQueue = - new ThrottlingTaskQueue( + ThrottlingTaskQueue.create( maximumConcurrentRequests, metricsSystem, TekuMetricCategory.BEACON, diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java index 2a871d72bd7..9b73551b4c1 100644 --- a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.infrastructure.async; +import com.google.common.annotations.VisibleForTesting; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Supplier; @@ -20,42 +21,67 @@ import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; public class ThrottlingTaskQueue { + + protected final Queue queuedTasks = new ConcurrentLinkedQueue<>(); + private final int maximumConcurrentTasks; - private final Queue queuedTasks = new ConcurrentLinkedQueue<>(); + private int inflightTaskCount = 0; - public ThrottlingTaskQueue( + public static ThrottlingTaskQueue create( final int maximumConcurrentTasks, final MetricsSystem metricsSystem, final TekuMetricCategory metricCategory, final String metricName) { - this.maximumConcurrentTasks = maximumConcurrentTasks; - + final ThrottlingTaskQueue taskQueue = new ThrottlingTaskQueue(maximumConcurrentTasks); metricsSystem.createGauge( - metricCategory, metricName, "Number of tasks queued", queuedTasks::size); + metricCategory, metricName, "Number of tasks queued", taskQueue.queuedTasks::size); + return taskQueue; + } + + protected ThrottlingTaskQueue(final int maximumConcurrentTasks) { + this.maximumConcurrentTasks = maximumConcurrentTasks; } public SafeFuture queueTask(final Supplier> request) { - final SafeFuture future = new SafeFuture<>(); - queuedTasks.add( - () -> { - final SafeFuture requestFuture = request.get(); - requestFuture.propagateTo(future); - requestFuture.always(this::taskComplete); - }); + final SafeFuture target = new SafeFuture<>(); + final Runnable taskToQueue = getTaskToQueue(request, target); + queuedTasks.add(taskToQueue); processQueuedTasks(); - return future; + return target; } - private synchronized void taskComplete() { - inflightTaskCount--; - processQueuedTasks(); + protected Runnable getTaskToQueue( + final Supplier> request, final SafeFuture target) { + return () -> { + final SafeFuture requestFuture = request.get(); + requestFuture.propagateTo(target); + requestFuture.always(this::taskComplete); + }; } - private synchronized void processQueuedTasks() { - while (inflightTaskCount < maximumConcurrentTasks && !queuedTasks.isEmpty()) { + protected Runnable getTaskToRun() { + return queuedTasks.remove(); + } + + protected synchronized void processQueuedTasks() { + while (inflightTaskCount < maximumConcurrentTasks && getQueuedTasksCount() > 0) { inflightTaskCount++; - queuedTasks.remove().run(); + getTaskToRun().run(); } } + + protected int getQueuedTasksCount() { + return queuedTasks.size(); + } + + @VisibleForTesting + int getInflightTaskCount() { + return inflightTaskCount; + } + + private synchronized void taskComplete() { + inflightTaskCount--; + processQueuedTasks(); + } } diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriority.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriority.java new file mode 100644 index 00000000000..9ab91e4ba57 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriority.java @@ -0,0 +1,69 @@ +/* + * Copyright ConsenSys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Supplier; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.LabelledGauge; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; + +public class ThrottlingTaskQueueWithPriority extends ThrottlingTaskQueue { + + private final Queue queuedPrioritizedTasks = new ConcurrentLinkedQueue<>(); + + public static ThrottlingTaskQueueWithPriority create( + final int maximumConcurrentTasks, + final MetricsSystem metricsSystem, + final TekuMetricCategory metricCategory, + final String metricName) { + final ThrottlingTaskQueueWithPriority taskQueue = + new ThrottlingTaskQueueWithPriority(maximumConcurrentTasks); + final LabelledGauge taskQueueGauge = + metricsSystem.createLabelledGauge( + metricCategory, metricName, "Number of tasks queued", "priority"); + taskQueueGauge.labels(taskQueue.queuedTasks::size, "normal"); + taskQueueGauge.labels(taskQueue.queuedPrioritizedTasks::size, "high"); + return taskQueue; + } + + private ThrottlingTaskQueueWithPriority(final int maximumConcurrentTasks) { + super(maximumConcurrentTasks); + } + + public SafeFuture queueTask( + final Supplier> request, final boolean prioritize) { + if (!prioritize) { + return queueTask(request); + } + final SafeFuture target = new SafeFuture<>(); + final Runnable taskToQueue = getTaskToQueue(request, target); + queuedPrioritizedTasks.add(taskToQueue); + processQueuedTasks(); + return target; + } + + @Override + protected Runnable getTaskToRun() { + return !queuedPrioritizedTasks.isEmpty() + ? queuedPrioritizedTasks.remove() + : queuedTasks.remove(); + } + + @Override + protected int getQueuedTasksCount() { + return queuedTasks.size() + queuedPrioritizedTasks.size(); + } +} diff --git a/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueTest.java b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueTest.java new file mode 100644 index 00000000000..b3671f9a376 --- /dev/null +++ b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueTest.java @@ -0,0 +1,64 @@ +/* + * Copyright ConsenSys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; + +public class ThrottlingTaskQueueTest { + + private static final int MAXIMUM_CONCURRENT_TASKS = 3; + + private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem(); + + private final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner(); + + private final ThrottlingTaskQueue taskQueue = + ThrottlingTaskQueue.create( + MAXIMUM_CONCURRENT_TASKS, stubMetricsSystem, TekuMetricCategory.BEACON, "test_metric"); + + @Test + public void throttlesRequests() { + final List> requests = + IntStream.range(0, 100) + .mapToObj( + element -> { + final SafeFuture request = + stubAsyncRunner.runAsync( + () -> { + assertThat(taskQueue.getInflightTaskCount()) + .isLessThanOrEqualTo(MAXIMUM_CONCURRENT_TASKS); + }); + return taskQueue.queueTask(() -> request); + }) + .collect(Collectors.toList()); + + assertThat(getQueuedTasksGaugeValue()).isEqualTo(97); + assertThat(taskQueue.getInflightTaskCount()).isEqualTo(3); + + stubAsyncRunner.executeQueuedActions(); + + requests.forEach(request -> assertThat(request).isCompleted()); + } + + private double getQueuedTasksGaugeValue() { + return stubMetricsSystem.getGauge(TekuMetricCategory.BEACON, "test_metric").getValue(); + } +} diff --git a/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriorityTest.java b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriorityTest.java new file mode 100644 index 00000000000..0c05a94f7fd --- /dev/null +++ b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriorityTest.java @@ -0,0 +1,111 @@ +/* + * Copyright ConsenSys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; + +public class ThrottlingTaskQueueWithPriorityTest { + + private static final int MAXIMUM_CONCURRENT_TASKS = 3; + + private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem(); + + private final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner(); + + private final ThrottlingTaskQueueWithPriority taskQueue = + ThrottlingTaskQueueWithPriority.create( + MAXIMUM_CONCURRENT_TASKS, stubMetricsSystem, TekuMetricCategory.BEACON, "test_metric"); + + @Test + public void throttlesRequests() { + final List> requests = + IntStream.range(0, 100) + .mapToObj( + element -> { + final SafeFuture request = + stubAsyncRunner.runAsync( + () -> { + assertThat(taskQueue.getInflightTaskCount()) + .isLessThanOrEqualTo(MAXIMUM_CONCURRENT_TASKS); + }); + // prioritize 1/3 of requests + if (element % 3 == 0) { + return taskQueue.queueTask(() -> request, true); + } + return taskQueue.queueTask(() -> request); + }) + .collect(Collectors.toList()); + + assertThat(getQueuedTasksGaugeValue(true)).isEqualTo(33); + assertThat(getQueuedTasksGaugeValue(false)).isEqualTo(64); + assertThat(taskQueue.getInflightTaskCount()).isEqualTo(3); + + stubAsyncRunner.executeQueuedActions(); + + requests.forEach(request -> assertThat(request).isCompleted()); + } + + @Test + @SuppressWarnings("FutureReturnValueIgnored") + public void prioritizesRequests() { + final SafeFuture initialRequest = new SafeFuture<>(); + final SafeFuture prioritizedRequest = new SafeFuture<>(); + final SafeFuture normalRequest = new SafeFuture<>(); + + final AtomicBoolean priorityFirst = new AtomicBoolean(false); + + // fill queue + IntStream.range(0, MAXIMUM_CONCURRENT_TASKS) + .forEach(__ -> taskQueue.queueTask(() -> initialRequest)); + final SafeFuture assertion = + taskQueue.queueTask( + () -> { + // make sure the prioritized request is ran first + // even though It has been queued after this one + assertThat(priorityFirst).isTrue(); + return normalRequest; + }); + taskQueue.queueTask( + () -> { + priorityFirst.set(true); + return prioritizedRequest; + }, + true); + + assertThat(getQueuedTasksGaugeValue(true)).isEqualTo(1); + assertThat(getQueuedTasksGaugeValue(false)).isEqualTo(1); + assertThat(taskQueue.getInflightTaskCount()).isEqualTo(3); + + initialRequest.complete(null); + normalRequest.complete(null); + prioritizedRequest.complete(null); + + assertThat(assertion).isCompleted(); + } + + private double getQueuedTasksGaugeValue(final boolean priority) { + return stubMetricsSystem + .getLabelledGauge(TekuMetricCategory.BEACON, "test_metric") + .getValue(priority ? "high" : "normal") + .orElseThrow(); + } +} diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorKeysOptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorKeysOptions.java index 2c23b0c9189..5d280a2a18a 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorKeysOptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/ValidatorKeysOptions.java @@ -98,7 +98,8 @@ public class ValidatorKeysOptions { @CommandLine.Option( names = {"--Xvalidators-external-signer-concurrent-limit"}, paramLabel = "", - description = "The maximum number of concurrent background requests to make to the signer.", + description = + "The maximum number of concurrent background requests to make to the signer. This only applies for aggregation slot and validator registration requests.", hidden = true, arity = "1") private int validatorExternalSignerConcurrentRequestLimit = diff --git a/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerAltairIntegrationTest.java b/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerAltairIntegrationTest.java index 882b2def77e..dae917bb5a6 100644 --- a/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerAltairIntegrationTest.java +++ b/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerAltairIntegrationTest.java @@ -39,7 +39,7 @@ import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.bls.BLSTestUtil; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -82,8 +82,9 @@ public class ExternalSignerAltairIntegrationTest { private final ForkInfo forkInfo = dataStructureUtil.randomForkInfo(); private static final BLSKeyPair KEYPAIR = BLSTestUtil.randomKeyPair(1234); private final StubMetricsSystem metricsSystem = new StubMetricsSystem(); - private final ThrottlingTaskQueue queue = - new ThrottlingTaskQueue(8, metricsSystem, TekuMetricCategory.VALIDATOR, "externalSignerTest"); + private final ThrottlingTaskQueueWithPriority queue = + ThrottlingTaskQueueWithPriority.create( + 8, metricsSystem, TekuMetricCategory.VALIDATOR, "externalSignerTest"); private ClientAndServer client; private ExternalSigner externalSigner; diff --git a/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerBellatrixBlockSigningIntegrationTest.java b/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerBellatrixBlockSigningIntegrationTest.java index e4479c06d7c..520ae9216c8 100644 --- a/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerBellatrixBlockSigningIntegrationTest.java +++ b/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerBellatrixBlockSigningIntegrationTest.java @@ -35,7 +35,7 @@ import tech.pegasys.teku.bls.BLSKeyPair; import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.bls.BLSTestUtil; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.spec.Spec; @@ -57,8 +57,9 @@ public class ExternalSignerBellatrixBlockSigningIntegrationTest { private final ForkInfo fork = dataStructureUtil.randomForkInfo(); private static final BLSKeyPair KEYPAIR = BLSTestUtil.randomKeyPair(1234); private final StubMetricsSystem metricsSystem = new StubMetricsSystem(); - private final ThrottlingTaskQueue queue = - new ThrottlingTaskQueue(8, metricsSystem, TekuMetricCategory.VALIDATOR, "externalSignerTest"); + private final ThrottlingTaskQueueWithPriority queue = + ThrottlingTaskQueueWithPriority.create( + 8, metricsSystem, TekuMetricCategory.VALIDATOR, "externalSignerTest"); private ClientAndServer client; private ExternalSigner externalSigner; diff --git a/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerIntegrationTest.java b/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerIntegrationTest.java index 0aff33605b0..289236baa9f 100644 --- a/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerIntegrationTest.java +++ b/validator/client/src/integration-test/java/tech/pegasys/teku/validator/client/signer/ExternalSignerIntegrationTest.java @@ -43,7 +43,7 @@ import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.bls.BLSTestUtil; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -68,8 +68,9 @@ public class ExternalSignerIntegrationTest { private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); private final ForkInfo fork = dataStructureUtil.randomForkInfo(); private final StubMetricsSystem metricsSystem = new StubMetricsSystem(); - private final ThrottlingTaskQueue queue = - new ThrottlingTaskQueue(8, metricsSystem, TekuMetricCategory.VALIDATOR, "externalSignerTest"); + private final ThrottlingTaskQueueWithPriority queue = + ThrottlingTaskQueueWithPriority.create( + 8, metricsSystem, TekuMetricCategory.VALIDATOR, "externalSignerTest"); private final SigningRootUtil signingRootUtil = new SigningRootUtil(spec); private ClientAndServer client; diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorProvider.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorProvider.java index 29d92733259..b7988d5812c 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorProvider.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorProvider.java @@ -20,7 +20,7 @@ import java.util.function.Supplier; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.bls.BLSPublicKey; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.signatures.Signer; import tech.pegasys.teku.validator.client.signer.ExternalSigner; @@ -32,7 +32,7 @@ class ExternalValidatorProvider implements ValidatorSource.ValidatorProvider { private final URL externalSignerUrl; private final BLSPublicKey publicKey; private final Duration externalSignerTimeout; - private final ThrottlingTaskQueue externalSignerTaskQueue; + private final ThrottlingTaskQueueWithPriority externalSignerTaskQueue; private final MetricsSystem metricsSystem; private final boolean readOnly; @@ -42,7 +42,7 @@ class ExternalValidatorProvider implements ValidatorSource.ValidatorProvider { final URL externalSignerUrl, final BLSPublicKey publicKey, final Duration externalSignerTimeout, - final ThrottlingTaskQueue externalSignerTaskQueue, + final ThrottlingTaskQueueWithPriority externalSignerTaskQueue, final MetricsSystem metricsSystem, final boolean readOnly) { this.spec = spec; diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSource.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSource.java index ae3a99c7240..e640e8ec1b1 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSource.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSource.java @@ -35,7 +35,7 @@ import tech.pegasys.signers.bls.keystore.model.KeyStoreData; import tech.pegasys.teku.bls.BLSPublicKey; import tech.pegasys.teku.infrastructure.async.AsyncRunner; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException; import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout; import tech.pegasys.teku.spec.Spec; @@ -54,7 +54,7 @@ public class ExternalValidatorSource extends AbstractValidatorSource implements private final ValidatorConfig config; private final Supplier externalSignerHttpClientFactory; private final PublicKeyLoader publicKeyLoader; - private final ThrottlingTaskQueue externalSignerTaskQueue; + private final ThrottlingTaskQueueWithPriority externalSignerTaskQueue; private final MetricsSystem metricsSystem; private final Map externalValidatorSourceMap = new ConcurrentHashMap<>(); @@ -63,7 +63,7 @@ private ExternalValidatorSource( final ValidatorConfig config, final Supplier externalSignerHttpClientFactory, final PublicKeyLoader publicKeyLoader, - final ThrottlingTaskQueue externalSignerTaskQueue, + final ThrottlingTaskQueueWithPriority externalSignerTaskQueue, final MetricsSystem metricsSystem, final boolean readOnly, final Optional maybeDataDirLayout) { @@ -84,7 +84,7 @@ public static ExternalValidatorSource create( final PublicKeyLoader publicKeyLoader, final AsyncRunner asyncRunner, final boolean readOnly, - final ThrottlingTaskQueue externalSignerTaskQueue, + final ThrottlingTaskQueueWithPriority externalSignerTaskQueue, final Optional maybeDataDirLayout) { setupExternalSignerStatusLogging(config, externalSignerHttpClientFactory, asyncRunner); return new ExternalValidatorSource( diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ValidatorSourceFactory.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ValidatorSourceFactory.java index ff3dcab54df..ba2da0d69f8 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ValidatorSourceFactory.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/loader/ValidatorSourceFactory.java @@ -24,7 +24,7 @@ import org.apache.logging.log4j.Logger; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.infrastructure.async.AsyncRunner; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout; import tech.pegasys.teku.spec.Spec; @@ -73,7 +73,7 @@ public class ValidatorSourceFactory { private final Optional maybeDataDir; private Optional mutableLocalValidatorSource = Optional.empty(); private Optional mutableExternalValidatorSource = Optional.empty(); - private ThrottlingTaskQueue externalSignerTaskQueue; + private ThrottlingTaskQueueWithPriority externalSignerTaskQueue; public ValidatorSourceFactory( final Spec spec, @@ -217,10 +217,10 @@ private ValidatorSource slashingProtected(final ValidatorSource validatorSource) return new SlashingProtectedValidatorSource(validatorSource, slashingProtector); } - private ThrottlingTaskQueue initializeExternalSignerTaskQueue() { + private ThrottlingTaskQueueWithPriority initializeExternalSignerTaskQueue() { if (externalSignerTaskQueue == null) { externalSignerTaskQueue = - new ThrottlingTaskQueue( + ThrottlingTaskQueueWithPriority.create( config.getValidatorExternalSignerConcurrentRequestLimit(), metricsSystem, TekuMetricCategory.VALIDATOR, diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/signer/ExternalSigner.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/signer/ExternalSigner.java index 9e421a629ae..ecb59d17536 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/signer/ExternalSigner.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/signer/ExternalSigner.java @@ -41,7 +41,7 @@ import tech.pegasys.teku.bls.BLSPublicKey; import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.provider.JsonProvider; @@ -67,7 +67,7 @@ public class ExternalSigner implements Signer { private final Duration timeout; private final Spec spec; private final HttpClient httpClient; - private final ThrottlingTaskQueue taskQueue; + private final ThrottlingTaskQueueWithPriority taskQueue; private final SigningRootUtil signingRootUtil; private final Counter successCounter; @@ -80,7 +80,7 @@ public ExternalSigner( final URL signingServiceUrl, final BLSPublicKey blsPublicKey, final Duration timeout, - final ThrottlingTaskQueue taskQueue, + final ThrottlingTaskQueueWithPriority taskQueue, final MetricsSystem metricsSystem) { this.spec = spec; this.httpClient = httpClient; @@ -159,7 +159,8 @@ public SafeFuture signAggregationSlot(final UInt64 slot, final For signingRootUtil.signingRootForSignAggregationSlot(slot, forkInfo), SignType.AGGREGATION_SLOT, Map.of("aggregation_slot", Map.of("slot", slot), FORK_INFO, forkInfo(forkInfo)), - slashableGenericMessage("aggregation slot"))); + slashableGenericMessage("aggregation slot")), + true); } @Override @@ -257,21 +258,23 @@ public SafeFuture signContributionAndProof( @Override public SafeFuture signValidatorRegistration( final ValidatorRegistration validatorRegistration) { - return sign( - signingRootUtil.signingRootForValidatorRegistration(validatorRegistration), - SignType.VALIDATOR_REGISTRATION, - Map.of( - "validator_registration", - Map.of( - "fee_recipient", - validatorRegistration.getFeeRecipient().toHexString(), - "gas_limit", - validatorRegistration.getGasLimit(), - "timestamp", - validatorRegistration.getTimestamp(), - "pubkey", - validatorRegistration.getPublicKey().toString())), - slashableGenericMessage("validator registration")); + return taskQueue.queueTask( + () -> + sign( + signingRootUtil.signingRootForValidatorRegistration(validatorRegistration), + SignType.VALIDATOR_REGISTRATION, + Map.of( + "validator_registration", + Map.of( + "fee_recipient", + validatorRegistration.getFeeRecipient().toHexString(), + "gas_limit", + validatorRegistration.getGasLimit(), + "timestamp", + validatorRegistration.getTimestamp(), + "pubkey", + validatorRegistration.getPublicKey().toString())), + slashableGenericMessage("validator registration"))); } @Override diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSourceTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSourceTest.java index 452eafb0f01..40e1d2fc147 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSourceTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/ExternalValidatorSourceTest.java @@ -43,7 +43,7 @@ import tech.pegasys.teku.bls.BLSPublicKey; import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout; @@ -65,7 +65,7 @@ public class ExternalValidatorSourceTest { private final HttpClient httpClient = mock(HttpClient.class); private final MetricsSystem metricsSystem = new StubMetricsSystem(); private final AsyncRunner asyncRunner = new StubAsyncRunner(); - private ThrottlingTaskQueue externalSignerTaskQueue; + private ThrottlingTaskQueueWithPriority externalSignerTaskQueue; private final Supplier httpClientFactory = () -> httpClient; @@ -84,7 +84,7 @@ void setup(@TempDir Path tempDir) throws IOException, InterruptedException { .validatorExternalSignerUrl(new URL("http://localhost:9000")) .build(); externalSignerTaskQueue = - new ThrottlingTaskQueue( + ThrottlingTaskQueueWithPriority.create( config.getValidatorExternalSignerConcurrentRequestLimit(), metricsSystem, TekuMetricCategory.VALIDATOR, diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/SlashingProtectionLoggerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/SlashingProtectionLoggerTest.java index cc98f4c30ae..20c0504bcb8 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/SlashingProtectionLoggerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/loader/SlashingProtectionLoggerTest.java @@ -36,7 +36,7 @@ import tech.pegasys.teku.bls.BLSPublicKey; import tech.pegasys.teku.ethereum.signingrecord.ValidatorSigningRecord; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; -import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue; +import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueueWithPriority; import tech.pegasys.teku.infrastructure.logging.ValidatorLogger; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -164,7 +164,7 @@ private Validator createUnProtectedValidator(BLSPublicKey publicKey) { new URL("http://127.0.0.1/"), publicKey, TIMEOUT, - mock(ThrottlingTaskQueue.class), + mock(ThrottlingTaskQueueWithPriority.class), metricsSystem); } catch (MalformedURLException e) { throw new RuntimeException(e);