Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow execution clients several seconds to construct blocks #4012

Merged
merged 6 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type
stateTtlCache*: StateTtlCache
nextExchangeTransitionConfTime*: Moment
router*: ref MessageRouter
dynamicFeeRecipientsStore*: DynamicFeeRecipientsStore
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore

const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
Expand Down
103 changes: 101 additions & 2 deletions beacon_chain/consensus_object_pools/consensus_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,20 @@ import
../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
../eth1/eth1_monitor

from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getSuggestedFeeRecipient

type
ForkChoiceUpdatedInformation* = object
payloadId*: PayloadID
headBlockRoot*: Eth2Digest
safeBlockRoot*: Eth2Digest
finalizedBlockRoot*: Eth2Digest
timestamp*: uint64
feeRecipient*: Eth1Address

ConsensusManager* = object
expectedSlot: Slot
expectedBlockReceived: Future[bool]
Expand All @@ -34,20 +47,37 @@ type
# ----------------------------------------------------------------
eth1Monitor*: Eth1Monitor

# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore
keymanagerHost: ref KeymanagerHost
defaultFeeRecipient: Eth1Address

# Tracking last proposal forkchoiceUpdated payload information
# ----------------------------------------------------------------
forkchoiceUpdatedInfo*: Opt[ForkchoiceUpdatedInformation]

# Initialization
# ------------------------------------------------------------------------------

func new*(T: type ConsensusManager,
dag: ChainDAGRef,
attestationPool: ref AttestationPool,
quarantine: ref Quarantine,
eth1Monitor: Eth1Monitor
eth1Monitor: Eth1Monitor,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
keymanagerHost: ref KeymanagerHost,
defaultFeeRecipient: Eth1Address
): ref ConsensusManager =
(ref ConsensusManager)(
dag: dag,
attestationPool: attestationPool,
quarantine: quarantine,
eth1Monitor: eth1Monitor
eth1Monitor: eth1Monitor,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
keymanagerHost: keymanagerHost,
forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation,
defaultFeeRecipient: defaultFeeRecipient
)

# Consensus Management
Expand Down Expand Up @@ -182,6 +212,71 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =

self.updateHead(newHead)

proc checkNextProposer(dag: ChainDAGRef, slot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let proposer = dag.getProposer(dag.head, slot + 1)
if proposer.isNone():
return Opt.none((ValidatorIndex, ValidatorPubKey))
Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, dag.getProposer already validates that dag.validatorKey will return some result.


proc getFeeRecipient*(
self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address =
self.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if self.keymanagerHost != nil:
self.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr:
self.defaultFeeRecipient
else:
self.defaultFeeRecipient

from ../spec/datatypes/bellatrix import PayloadID

proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} =
withState(self.dag.headState):
let
nextSlot = state.data.slot + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge cases:

  • During sync
  • When the latest N slot were empty, e.g., we will propose wallSlot + 1, but dag.headState.slot < wallSlot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wastes a bit of time on the EL locally, nothing else.

(validatorIndex, nextProposer) =
self.dag.checkNextProposer(nextSlot).valueOr:
return

# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = compute_timestamp_at_slot(state.data, nextSlot)
randomData =
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextSlot.epoch)
headBlockRoot = self.dag.loadExecutionBlockRoot(self.dag.head)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is called from an async context, dag.head may already point to a new head by this time. Around the merge, this could be a non-execution block with headBlockRoot.isZero, which is not documented as an allowed value for forkChoiceUpdated (in the runForkchoiceUpdated wrapper, it even asserts for that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finalizedBlockRoot =
self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck)

if headBlockRoot.isZero:
return

try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor, headBlockRoot, finalizedBlockRoot, timestamp,
randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))

if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return

self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
finalizedBlockRoot: finalizedBlockRoot,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strictVerification assert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if ideal. Those are maybe best when it's verifying something truly Nimbus-internal, but this isn't. There are exogenous factors involved here, and strictVericiation, though strict, shouldn't trigger except on actual Nimbus logic errors fixable solely in Nimbus code.


proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
{.async.} =
## Trigger fork choice and update the DAG with the new head block
Expand All @@ -197,6 +292,10 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
# justified and finalized
self.dag.updateHead(newHead, self.quarantine[])

# TODO after things stabilize with this, check for upcoming proposal and
# don't bother sending first fcU, but initially, keep both in place
asyncSpawn self.runProposalForkchoiceUpdated()
Comment on lines +295 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is yet another forkChoiceUpdated in block_processor.nim (expectValidForkchoiceUpdated). The function here (updateHeadWithExecution) is only called when a block is processed with a payload that initially returned an inconclusive result (syncing/accepted). The other one is called on blocks with payloads that already have valid verdict. It seems a bit counterintuitive to only do this if there was any recent syncing/accepted block but not if there only werevalid block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to wait until dag.updateHead runs, which happens after the bulk of the block_processor/consensus_manager logic, because that's when/where dag.headState is updated, which is necessary to get the RANDAO information from said state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f8b4984 runs it when newPayload is VALID


self[].checkExpectedBlock()
except CatchableError as exc:
debug "updateHeadWithExecution error",
Expand Down
6 changes: 5 additions & 1 deletion beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import
../sszdump

from ../consensus_object_pools/consensus_manager import
ConsensusManager, runForkchoiceUpdated, updateHead, updateHeadWithExecution
ConsensusManager, runForkchoiceUpdated, runProposalForkchoiceUpdated,
updateHead, updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, slot
from ../consensus_object_pools/block_pools_types import BlockError, EpochRef
Expand Down Expand Up @@ -307,6 +308,9 @@ proc storeBlock*(
executionHeadRoot,
self.consensusManager.dag.loadExecutionBlockRoot(
self.consensusManager.dag.finalizedHead.blck))

# TODO remove redundant fcU in case of proposal
asyncSpawn self.consensusManager.runProposalForkchoiceUpdated()
else:
asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get)
else:
Expand Down
8 changes: 5 additions & 3 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ proc initFullNode(
exitPool = newClone(
ExitPool.init(dag, attestationPool, onVoluntaryExitAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.eth1Monitor)
dag, attestationPool, quarantine, node.eth1Monitor,
node.dynamicFeeRecipientsStore, node.keymanagerHost,
config.defaultFeeRecipient)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime,
Expand Down Expand Up @@ -751,7 +753,7 @@ proc init*(T: type BeaconNode,
validatorMonitor: validatorMonitor,
stateTtlCache: stateTtlCache,
nextExchangeTransitionConfTime: nextExchangeTransitionConfTime,
dynamicFeeRecipientsStore: DynamicFeeRecipientsStore.init())
dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()))

node.initLightClient(
rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root)
Expand Down Expand Up @@ -1201,7 +1203,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.syncCommitteeMsgPool[].pruneData(slot)
if slot.is_epoch:
node.trackNextSyncCommitteeTopics(slot)
node.dynamicFeeRecipientsStore.pruneOldMappings(slot.epoch)
node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch)

# Update upcoming actions - we do this every slot in case a reorg happens
let head = node.dag.head
Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/rpc/rest_validator_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
currentEpoch = node.beaconClock.now.slotOrZero.epoch

for proposerData in body:
node.dynamicFeeRecipientsStore.addMapping(
node.dynamicFeeRecipientsStore[].addMapping(
proposerData.validator_index,
proposerData.fee_recipient,
currentEpoch)
Expand Down
32 changes: 27 additions & 5 deletions beacon_chain/validators/validator_duties.nim
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ proc get_execution_payload(
asConsensusExecutionPayload(
await execution_engine.getPayload(payload_id.get))

# TODO remove in favor of consensusManager copy
proc getFeeRecipient(node: BeaconNode,
pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address =
node.dynamicFeeRecipientsStore.getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
node.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if node.keymanagerHost != nil:
node.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr:
node.config.defaultFeeRecipient
Expand Down Expand Up @@ -406,10 +407,31 @@ proc getExecutionPayload(
latestFinalized =
node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck)
feeRecipient = node.getFeeRecipient(pubkey, validator_index, epoch)
payload_id = (await forkchoice_updated(
proposalState.bellatrixData.data, latestHead, latestFinalized,
feeRecipient,
node.consensusManager.eth1Monitor))
lastFcU = node.consensusManager.forkchoiceUpdatedInfo
timestamp = compute_timestamp_at_slot(
proposalState.bellatrixData.data,
proposalState.bellatrixData.data.slot)
payload_id =
if lastFcU.isSome and
lastFcU.get.headBlockRoot == latestHead and
lastFcU.get.finalizedBlockRoot == latestFinalized and
lastFcU.get.timestamp == timestamp and
lastFcU.get.feeRecipient == feeRecipient:
some bellatrix.PayloadID(lastFcU.get.payloadId)
else:
debug "getExecutionPayload: didn't find payloadId, re-querying",
latestHead,
latestFinalized,
timestamp,
feeRecipient,
cachedHeadBlockRoot = lastFcU.get.headBlockRoot,
cachedFinalizedBlockRoot = lastFcU.get.finalizedBlockRoot,
cachedTimestamp = lastFcU.get.timestamp,
cachedFeeRecipient = lastFcU.get.feeRecipient

(await forkchoice_updated(
proposalState.bellatrixData.data, latestHead, latestFinalized,
feeRecipient, node.consensusManager.eth1Monitor))
payload = try:
awaitWithTimeout(
get_execution_payload(payload_id, node.consensusManager.eth1Monitor),
Expand Down
9 changes: 8 additions & 1 deletion tests/test_block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import
../beacon_chain/eth1/eth1_monitor,
./testutil, ./testdbutil, ./testblockutil

from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, init
from ../beacon_chain/validators/keystore_management import KeymanagerHost

proc pruneAtFinalization(dag: ChainDAGRef) =
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
Expand All @@ -36,8 +40,11 @@ suite "Block processor" & preset():
quarantine = newClone(Quarantine.init())
attestationPool = newClone(AttestationPool.init(dag, quarantine))
eth1Monitor = new Eth1Monitor
keymanagerHost: ref KeymanagerHost
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, eth1Monitor)
dag, attestationPool, quarantine, eth1Monitor,
newClone(DynamicFeeRecipientsStore.init()), keymanagerHost,
default(Eth1Address))
state = newClone(dag.headState)
cache = StateCache()
b1 = addTestBlock(state[], cache).phase0Data
Expand Down