Skip to content

Commit

Permalink
call engine_forkChoiceUpdated with PayloadAttributes using default …
Browse files Browse the repository at this point in the history
…fee recipient if proposer is not prepared (#4882)

* call engine_forkChoiceUpdated with PayloadAttributes using default fee recipient if proposer is not prepared
* rename --Xvalidators-suggested-fee-recipient-address to --Xvalidators-proposer-default-fee-recipient
* better separation between ForkChoiceNotifier and ForkChoiceUpdateData
* various warn\error messages
  • Loading branch information
tbenr authored Jan 21, 2022
1 parent 3607680 commit f88438e
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package tech.pegasys.teku.statetransition.forkchoice;

import static tech.pegasys.teku.infrastructure.logging.StatusLogger.STATUS_LOG;

import java.util.Collection;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
Expand All @@ -22,14 +24,14 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.AsyncRunnerEventThread;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.ssz.type.Bytes20;
import tech.pegasys.teku.infrastructure.ssz.type.Bytes8;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.operations.versions.bellatrix.BeaconPreparableProposer;
import tech.pegasys.teku.spec.executionengine.ExecutionEngineChannel;
import tech.pegasys.teku.spec.executionengine.ForkChoiceState;
import tech.pegasys.teku.spec.executionengine.PayloadAttributes;
import tech.pegasys.teku.storage.client.RecentChainData;

public class ForkChoiceNotifier {
Expand All @@ -42,8 +44,6 @@ public class ForkChoiceNotifier {
private final Spec spec;

private ForkChoiceUpdateData forkChoiceUpdateData = new ForkChoiceUpdateData();
private long payloadAttributesSequenceProducer = 0;
private long payloadAttributesSequenceConsumer = -1;

private boolean inSync = false; // Assume we are not in sync at startup.

Expand All @@ -64,7 +64,8 @@ public static ForkChoiceNotifier create(
final AsyncRunnerFactory asyncRunnerFactory,
final Spec spec,
final ExecutionEngineChannel executionEngineChannel,
final RecentChainData recentChainData) {
final RecentChainData recentChainData,
final Optional<? extends Bytes20> proposerDefaultFeeRecipient) {
final AsyncRunnerEventThread eventThread =
new AsyncRunnerEventThread("forkChoiceNotifier", asyncRunnerFactory);
eventThread.start();
Expand All @@ -73,7 +74,8 @@ public static ForkChoiceNotifier create(
spec,
executionEngineChannel,
recentChainData,
new PayloadAttributesCalculator(spec, eventThread, recentChainData));
new PayloadAttributesCalculator(
spec, eventThread, recentChainData, proposerDefaultFeeRecipient));
}

public void onUpdatePreparableProposers(final Collection<BeaconPreparableProposer> proposers) {
Expand Down Expand Up @@ -113,6 +115,7 @@ private void internalTerminalBlockReached(Bytes32 executionBlockHash) {

/**
* @param parentBeaconBlockRoot root of the beacon block the new block will be built on
* @param blockSlot slot of the block being produced, for which the payloadId has been requested
* @return must return a Future resolving to:
* <p>Optional.empty() only when is safe to produce a block with an empty execution payload
* (after the bellatrix fork and before Terminal Block arrival)
Expand Down Expand Up @@ -150,7 +153,7 @@ private SafeFuture<Optional<Bytes8>> internalGetPayloadId(
// returns, we save locally the current class reference.
ForkChoiceUpdateData localForkChoiceUpdateData = forkChoiceUpdateData;
return payloadAttributesCalculator
.calculatePayloadAttributes(blockSlot, inSync, localForkChoiceUpdateData)
.calculatePayloadAttributes(blockSlot, inSync, localForkChoiceUpdateData, true)
.thenCompose(
newPayloadAttributes -> {

Expand Down Expand Up @@ -178,6 +181,10 @@ private void internalUpdatePreparableProposers(

LOG.debug("internalUpdatePreparableProposers proposers {}", proposers);

if (!payloadAttributesCalculator.isProposerDefaultFeeRecipientDefined()) {
STATUS_LOG.warnMissingProposerDefaultFeeRecipientWithPreparedBeaconProposerBeingCalled();
}

// Default to the genesis slot if we're pre-genesis.
final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(SpecConfig.GENESIS_SLOT);

Expand Down Expand Up @@ -218,46 +225,24 @@ private void sendForkChoiceUpdated() {
private void updatePayloadAttributes(final UInt64 blockSlot) {
LOG.debug("updatePayloadAttributes blockSlot {}", blockSlot);

// we want to preserve ordering in payload calculation,
// so we first generate a sequence for each calculation request
final long sequenceNumber = payloadAttributesSequenceProducer++;
payloadAttributesCalculator
.calculatePayloadAttributes(blockSlot, inSync, forkChoiceUpdateData)
.thenAcceptAsync(
newPayloadAttributes ->
updatePayloadAttributes(blockSlot, newPayloadAttributes, sequenceNumber),
forkChoiceUpdateData
.withPayloadAttributesAsync(
() ->
payloadAttributesCalculator.calculatePayloadAttributes(
blockSlot, inSync, forkChoiceUpdateData, false),
eventThread)
.thenAccept(
newForkChoiceUpdateData -> {
if (newForkChoiceUpdateData.isPresent()) {
forkChoiceUpdateData = newForkChoiceUpdateData.get();
sendForkChoiceUpdated();
}
})
.finish(
error ->
LOG.error("Failed to calculate payload attributes for slot {}", blockSlot, error));
}

private boolean updatePayloadAttributes(
final UInt64 blockSlot,
final Optional<PayloadAttributes> newPayloadAttributes,
final long sequenceNumber) {
eventThread.checkOnEventThread();

LOG.debug(
"updatePayloadAttributes blockSlot {} newPayloadAttributes {}",
blockSlot,
newPayloadAttributes);

// to preserve ordering we make sure we haven't already calculated a payload that has been
// requested later than the current one
if (sequenceNumber <= payloadAttributesSequenceConsumer) {
LOG.warn("Ignoring calculated payload attributes since it violates ordering");
return false;
}
payloadAttributesSequenceConsumer = sequenceNumber;

LOG.debug("updatePayloadAttributes blockSlot {} {}", blockSlot, newPayloadAttributes);

forkChoiceUpdateData = forkChoiceUpdateData.withPayloadAttributes(newPayloadAttributes);
sendForkChoiceUpdated();
return true;
}

public PayloadAttributesCalculator getPayloadAttributesCalculator() {
return payloadAttributesCalculator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package tech.pegasys.teku.statetransition.forkchoice;

import com.google.common.base.MoreObjects;
import com.google.common.base.Supplier;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
Expand All @@ -35,6 +37,9 @@ public class ForkChoiceUpdateData {
private final SafeFuture<Optional<Bytes8>> payloadId = new SafeFuture<>();
private boolean sent = false;

private long payloadAttributesSequenceProducer = 0;
private long payloadAttributesSequenceConsumer = -1;

public ForkChoiceUpdateData() {
this.forkChoiceState =
new ForkChoiceState(
Expand Down Expand Up @@ -87,6 +92,29 @@ public ForkChoiceUpdateData withTerminalBlockHash(final Bytes32 terminalBlockHas
forkChoiceState, payloadAttributes, Optional.of(terminalBlockHash));
}

public SafeFuture<Optional<ForkChoiceUpdateData>> withPayloadAttributesAsync(
final Supplier<SafeFuture<Optional<PayloadAttributes>>> payloadAttributesCalculator,
final Executor executor) {
// we want to preserve ordering in payload calculation,
// so we first generate a sequence for each calculation request
final long sequenceNumber = payloadAttributesSequenceProducer++;

return payloadAttributesCalculator
.get()
.thenApplyAsync(
newPayloadAttributes -> {
// to preserve ordering we make sure we haven't already calculated a payload that has
// been requested later than the current one
if (sequenceNumber <= payloadAttributesSequenceConsumer) {
LOG.warn("Ignoring calculated payload attributes since it violates ordering");
return Optional.empty();
}
payloadAttributesSequenceConsumer = sequenceNumber;
return Optional.of(this.withPayloadAttributes(newPayloadAttributes));
},
executor);
}

public boolean isPayloadIdSuitable(final Bytes32 parentExecutionHash, final UInt64 timestamp) {
if (payloadAttributes.isEmpty()) {
LOG.debug("isPayloadIdSuitable - payloadAttributes.isEmpty returning false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package tech.pegasys.teku.statetransition.forkchoice;

import static tech.pegasys.teku.infrastructure.logging.ValidatorLogger.VALIDATOR_LOGGER;

import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.List;
Expand All @@ -23,6 +25,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.ssz.type.Bytes20;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
Expand All @@ -39,12 +42,17 @@ public class PayloadAttributesCalculator {
private final EventThread eventThread;
private final RecentChainData recentChainData;
private final Map<UInt64, ProposerInfo> proposerInfoByValidatorIndex = new ConcurrentHashMap<>();
private final Optional<? extends Bytes20> proposerDefaultFeeRecipient;

public PayloadAttributesCalculator(
final Spec spec, final EventThread eventThread, final RecentChainData recentChainData) {
final Spec spec,
final EventThread eventThread,
final RecentChainData recentChainData,
final Optional<? extends Bytes20> proposerDefaultFeeRecipient) {
this.spec = spec;
this.eventThread = eventThread;
this.recentChainData = recentChainData;
this.proposerDefaultFeeRecipient = proposerDefaultFeeRecipient;
}

public void updateProposers(
Expand All @@ -64,7 +72,8 @@ public void updateProposers(
public SafeFuture<Optional<PayloadAttributes>> calculatePayloadAttributes(
final UInt64 blockSlot,
final boolean inSync,
final ForkChoiceUpdateData forkChoiceUpdateData) {
final ForkChoiceUpdateData forkChoiceUpdateData,
final boolean mandatoryPayloadAttributes) {
eventThread.checkOnEventThread();
if (!inSync) {
// We don't produce blocks while syncing so don't bother preparing the payload
Expand All @@ -82,25 +91,45 @@ public SafeFuture<Optional<PayloadAttributes>> calculatePayloadAttributes(
final UInt64 epoch = spec.computeEpochAtSlot(blockSlot);
return getStateInEpoch(epoch)
.thenApplyAsync(
maybeState -> calculatePayloadAttributes(blockSlot, epoch, maybeState), eventThread);
maybeState ->
calculatePayloadAttributes(
blockSlot, epoch, maybeState, mandatoryPayloadAttributes),
eventThread);
}

private Optional<PayloadAttributes> calculatePayloadAttributes(
final UInt64 blockSlot, final UInt64 epoch, final Optional<BeaconState> maybeState) {
final UInt64 blockSlot,
final UInt64 epoch,
final Optional<BeaconState> maybeState,
final boolean mandatoryPayloadAttributes) {
eventThread.checkOnEventThread();
if (maybeState.isEmpty()) {
return Optional.empty();
}
final BeaconState state = maybeState.get();
final UInt64 proposerIndex = UInt64.valueOf(spec.getBeaconProposerIndex(state, blockSlot));
final ProposerInfo proposerInfo = proposerInfoByValidatorIndex.get(proposerIndex);
if (proposerInfo == null) {
if (proposerInfo == null && !mandatoryPayloadAttributes) {
// Proposer is not one of our validators. No need to propose a block.
return Optional.empty();
}
final UInt64 timestamp = spec.computeTimeAtSlot(state, blockSlot);
final Bytes32 random = spec.getRandaoMix(state, epoch);
return Optional.of(new PayloadAttributes(timestamp, random, proposerInfo.feeRecipient));
return Optional.of(
new PayloadAttributes(timestamp, random, getFeeRecipient(proposerInfo, blockSlot)));
}

// this function MUST return a fee recipient.
private Bytes20 getFeeRecipient(final ProposerInfo proposerInfo, final UInt64 blockSlot) {
if (proposerInfo != null) {
return proposerInfo.feeRecipient;
}
if (proposerDefaultFeeRecipient.isPresent()) {
VALIDATOR_LOGGER.executionPayloadPreparedUsingBeaconDefaultFeeRecipient(blockSlot);
return proposerDefaultFeeRecipient.get();
}
throw new IllegalStateException(
"Unable to determine proposer fee recipient address for slot " + blockSlot);
}

private SafeFuture<Optional<BeaconState>> getStateInEpoch(final UInt64 requiredEpoch) {
Expand Down Expand Up @@ -130,4 +159,8 @@ public List<Map<String, Object>> getData() {
.build())
.collect(Collectors.toList());
}

public boolean isProposerDefaultFeeRecipientDefined() {
return proposerDefaultFeeRecipient.isPresent();
}
}
Loading

0 comments on commit f88438e

Please sign in to comment.