From cac8a51091dc64675565ced7827de0b2e3ee457f Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 4 Apr 2022 09:01:47 -0700 Subject: [PATCH 1/8] Add payload id cache --- beacon-chain/blockchain/BUILD.bazel | 1 + beacon-chain/blockchain/optimistic_sync.go | 68 +++++++++++++++++- .../blockchain/optimistic_sync_test.go | 50 +++++++++++++ beacon-chain/blockchain/options.go | 9 +++ .../blockchain/receive_attestation_test.go | 10 ++- beacon-chain/blockchain/service.go | 1 + beacon-chain/cache/BUILD.bazel | 2 + beacon-chain/cache/payload_id.go | 59 +++++++++++++++ beacon-chain/cache/payload_id_test.go | 34 +++++++++ beacon-chain/core/helpers/beacon_committee.go | 13 +++- .../core/helpers/beacon_committee_test.go | 2 +- beacon-chain/node/BUILD.bazel | 1 + beacon-chain/node/node.go | 5 ++ .../rpc/eth/validator/validator_test.go | 27 +++---- .../rpc/prysm/v1alpha1/validator/BUILD.bazel | 2 + .../prysm/v1alpha1/validator/assignments.go | 10 ++- .../v1alpha1/validator/assignments_test.go | 71 ++++++++++--------- .../validator/proposer_execution_payload.go | 24 +++++++ .../proposer_execution_payload_test.go | 14 +++- .../prysm/v1alpha1/validator/proposer_test.go | 4 +- .../rpc/prysm/v1alpha1/validator/server.go | 1 + beacon-chain/rpc/service.go | 2 + beacon-chain/sync/pending_blocks_queue.go | 16 +++-- .../shared/common/forkchoice/BUILD.bazel | 1 + .../shared/common/forkchoice/service.go | 2 + 25 files changed, 365 insertions(+), 64 deletions(-) create mode 100644 beacon-chain/cache/payload_id.go create mode 100644 beacon-chain/cache/payload_id_test.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 973767ef6b2c..21754c1a4dac 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/db/kv:go_default_library", "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice/doubly-linked-tree:go_default_library", "//beacon-chain/forkchoice/protoarray:go_default_library", diff --git a/beacon-chain/blockchain/optimistic_sync.go b/beacon-chain/blockchain/optimistic_sync.go index 9e8bc5e91935..9c1b203c2b51 100644 --- a/beacon-chain/blockchain/optimistic_sync.go +++ b/beacon-chain/blockchain/optimistic_sync.go @@ -5,14 +5,21 @@ import ( "fmt" "github.com/pkg/errors" + types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/core/time" + "github.com/prysmaticlabs/prysm/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" + "github.com/prysmaticlabs/prysm/beacon-chain/state" + fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/encoding/bytesutil" enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block" + "github.com/prysmaticlabs/prysm/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -60,8 +67,13 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.Beac FinalizedBlockHash: finalizedHash, } - // payload attribute is only required when requesting payload, here we are just updating fork choice, so it is nil. - payloadID, _, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, nil /*payload attribute*/) + nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer. + hasAttr, attr, proposerId, err := s.getPayloadAttribute(ctx, s.headState(ctx), nextSlot) + if err != nil { + return nil, errors.Wrap(err, "could not get payload attribute") + } + + payloadID, _, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr) if err != nil { switch err { case powchain.ErrAcceptedSyncingPayloadStatus: @@ -78,6 +90,9 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.Beac if err := s.cfg.ForkChoiceStore.SetOptimisticToValid(ctx, headRoot); err != nil { return nil, errors.Wrap(err, "could not set block to valid") } + if hasAttr { // If the forkchoice update call has an attribute, update the proposer payload ID cache. + s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(nextSlot, proposerId, bytesutil.BytesToUint64BigEndian(payloadID[:])) + } return payloadID, nil } @@ -168,3 +183,52 @@ func (s *Service) optimisticCandidateBlock(ctx context.Context, blk block.Beacon } return parentIsExecutionBlock, nil } + +// getPayloadAttributes returns the payload attributes for the given state and slot. +// The attribute is required to initiate a payload build process in the context of an `engine_forkchoiceUpdated` call. +func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, slot types.Slot) (bool, *enginev1.PayloadAttributes, types.ValidatorIndex, error) { + proposerID, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(slot) + if !ok { // There's no need to build attribute if there is no proposer for slot. + return false, nil, 0, nil + } + + // Get previous randao. + st = st.Copy() + st, err := transition.ProcessSlotsIfPossible(ctx, st, slot) + if err != nil { + return false, nil, 0, err + } + prevRando, err := helpers.RandaoMix(st, time.CurrentEpoch(st)) + if err != nil { + return false, nil, 0, nil + } + + // Get fee recipient. + feeRecipient := params.BeaconConfig().DefaultFeeRecipient + recipient, err := s.cfg.BeaconDB.FeeRecipientByValidatorID(ctx, proposerID) + switch err == nil { + case true: + feeRecipient = recipient + case errors.As(err, kv.ErrNotFoundFeeRecipient): + if feeRecipient.String() == fieldparams.EthBurnAddressHex { + logrus.WithFields(logrus.Fields{ + "validatorIndex": proposerID, + "burnAddress": fieldparams.EthBurnAddressHex, + }).Error("Fee recipient not set. Using burn address") + } + default: + return false, nil, 0, errors.Wrap(err, "could not get fee recipient in db") + } + + // Get timestamp. + t, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot) + if err != nil { + return false, nil, 0, err + } + attr := &enginev1.PayloadAttributes{ + Timestamp: uint64(t.Unix()), + PrevRandao: prevRando, + SuggestedFeeRecipient: feeRecipient.Bytes(), + } + return true, attr, proposerID, nil +} diff --git a/beacon-chain/blockchain/optimistic_sync_test.go b/beacon-chain/blockchain/optimistic_sync_test.go index 0ea10cb43688..92f18014bef5 100644 --- a/beacon-chain/blockchain/optimistic_sync_test.go +++ b/beacon-chain/blockchain/optimistic_sync_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" + types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray" @@ -24,6 +27,7 @@ import ( "github.com/prysmaticlabs/prysm/testing/require" "github.com/prysmaticlabs/prysm/testing/util" "github.com/prysmaticlabs/prysm/time/slots" + logTest "github.com/sirupsen/logrus/hooks/test" ) func Test_NotifyForkchoiceUpdate(t *testing.T) { @@ -44,8 +48,13 @@ func Test_NotifyForkchoiceUpdate(t *testing.T) { WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithForkChoiceStore(fcs), + WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), } service, err := NewService(ctx, opts...) + st, _ := util.DeterministicGenesisState(t, 1) + service.head = &head{ + state: st, + } require.NoError(t, err) require.NoError(t, fcs.InsertOptimisticBlock(ctx, 0, [32]byte{}, [32]byte{}, params.BeaconConfig().ZeroHash, 0, 0)) @@ -573,6 +582,47 @@ func Test_IsOptimisticShallowExecutionParent(t *testing.T) { require.Equal(t, true, candidate) } +func Test_GetPayloadAttribute(t *testing.T) { + ctx := context.Background() + beaconDB := testDB.SetupDB(t) + opts := []Option{ + WithDatabase(beaconDB), + WithStateGen(stategen.New(beaconDB)), + WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), + } + + // Cache miss + service, err := NewService(ctx, opts...) + require.NoError(t, err) + hasPayload, _, vId, err := service.getPayloadAttribute(ctx, nil, 0) + require.NoError(t, err) + require.Equal(t, false, hasPayload) + require.Equal(t, types.ValidatorIndex(0), vId) + + // Cache hit, advance state, no fee recipient + suggestedVid := types.ValidatorIndex(1) + slot := types.Slot(1) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, 0) + st, _ := util.DeterministicGenesisState(t, 1) + hook := logTest.NewGlobal() + hasPayload, attr, vId, err := service.getPayloadAttribute(ctx, st, slot) + require.NoError(t, err) + require.Equal(t, true, hasPayload) + require.Equal(t, suggestedVid, vId) + require.Equal(t, fieldparams.EthBurnAddressHex, common.BytesToAddress(attr.SuggestedFeeRecipient).String()) + require.LogsContain(t, hook, "Fee recipient not set. Using burn address") + + // Cache hit, advance state, has fee recipient + suggestedAddr := common.HexToAddress("123") + require.NoError(t, service.cfg.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, []types.ValidatorIndex{suggestedVid}, []common.Address{suggestedAddr})) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, 0) + hasPayload, attr, vId, err = service.getPayloadAttribute(ctx, st, slot) + require.NoError(t, err) + require.Equal(t, true, hasPayload) + require.Equal(t, suggestedVid, vId) + require.Equal(t, suggestedAddr, common.BytesToAddress(attr.SuggestedFeeRecipient)) +} + func Test_UpdateLastValidatedCheckpoint(t *testing.T) { params.SetupTestConfigCleanup(t) params.OverrideBeaconConfig(params.MainnetConfig()) diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index f0ffa13b2073..4496d5116571 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -2,6 +2,7 @@ package blockchain import ( "github.com/prysmaticlabs/prysm/async/event" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" @@ -66,6 +67,14 @@ func WithDepositCache(c *depositcache.DepositCache) Option { } } +// WithProposerIdsCache for proposer id cache. +func WithProposerIdsCache(c *cache.ProposerPayloadIDsCache) Option { + return func(s *Service) error { + s.cfg.ProposerSlotIndexCache = c + return nil + } +} + // WithAttestationPool for attestation lifecycle after chain inclusion. func WithAttestationPool(p attestations.Pool) Option { return func(s *Service) error { diff --git a/beacon-chain/blockchain/receive_attestation_test.go b/beacon-chain/blockchain/receive_attestation_test.go index c1643228b453..3bfc3179d911 100644 --- a/beacon-chain/blockchain/receive_attestation_test.go +++ b/beacon-chain/blockchain/receive_attestation_test.go @@ -6,6 +6,7 @@ import ( "time" types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/transition" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -133,7 +134,7 @@ func TestNotifyEngineIfChangedHead(t *testing.T) { service, err := NewService(ctx, opts...) require.NoError(t, err) - + service.cfg.ProposerSlotIndexCache = cache.NewProposerPayloadIDsCache() service.notifyEngineIfChangedHead(ctx, service.headRoot()) hookErr := "could not notify forkchoice update" finalizedErr := "could not get finalized checkpoint" @@ -151,13 +152,20 @@ func TestNotifyEngineIfChangedHead(t *testing.T) { r, err := b.Block.HashTreeRoot() require.NoError(t, err) finalized := ðpb.Checkpoint{Root: r[:], Epoch: 0} + st, _ := util.DeterministicGenesisState(t, 1) service.head = &head{ slot: 1, root: r, block: wsb, + state: st, } + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, 1) service.store.SetFinalizedCheckpt(finalized) service.notifyEngineIfChangedHead(ctx, [32]byte{'b'}) require.LogsDoNotContain(t, hook, finalizedErr) require.LogsDoNotContain(t, hook, hookErr) + vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2) + require.Equal(t, true, has) + require.Equal(t, types.ValidatorIndex(1), vId) + require.Equal(t, uint64(1), payloadID) } diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 0689441e65c6..e2e3e64d588c 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -74,6 +74,7 @@ type config struct { ChainStartFetcher powchain.ChainStartFetcher BeaconDB db.HeadAccessDatabase DepositCache *depositcache.DepositCache + ProposerSlotIndexCache *cache.ProposerPayloadIDsCache AttPool attestations.Pool ExitPool voluntaryexits.PoolManager SlashingPool slashings.PoolManager diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index b07500d5f255..ab93b475468c 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "common.go", "doc.go", "error.go", + "payload_id.go", "proposer_indices.go", "proposer_indices_disabled.go", # keep "proposer_indices_type.go", @@ -61,6 +62,7 @@ go_test( "checkpoint_state_test.go", "committee_fuzz_test.go", "committee_test.go", + "payload_id_test.go", "proposer_indices_test.go", "skip_slot_cache_test.go", "subnet_ids_test.go", diff --git a/beacon-chain/cache/payload_id.go b/beacon-chain/cache/payload_id.go new file mode 100644 index 000000000000..1f2c2b4f3951 --- /dev/null +++ b/beacon-chain/cache/payload_id.go @@ -0,0 +1,59 @@ +package cache + +import ( + "sync" + + types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/encoding/bytesutil" +) + +const vIdLength = 8 +const pIdLength = 8 +const vpIdsLength = vIdLength + pIdLength + +// ProposerPayloadIDsCache is a cache of proposer payload IDs. +// The key is the slot. The value is the concatenation of the proposer and payload IDs. 8 bytes each. +type ProposerPayloadIDsCache struct { + slotToProposerAndPayloadIDs map[types.Slot][vpIdsLength]byte + sync.RWMutex +} + +// NewProposerPayloadIDsCache creates a new proposer payload IDs cache. +func NewProposerPayloadIDsCache() *ProposerPayloadIDsCache { + return &ProposerPayloadIDsCache{ + slotToProposerAndPayloadIDs: make(map[types.Slot][vpIdsLength]byte), + } +} + +// GetProposerPayloadIDs returns the proposer and payload IDs for the given slot. +func (f *ProposerPayloadIDsCache) GetProposerPayloadIDs(slot types.Slot) (types.ValidatorIndex, uint64, bool) { + f.RLock() + defer f.RUnlock() + ids, ok := f.slotToProposerAndPayloadIDs[slot] + if !ok { + return 0, 0, false + } + vId := ids[:vIdLength] + pId := ids[vIdLength:] + return types.ValidatorIndex(bytesutil.BytesToUint64BigEndian(vId)), bytesutil.BytesToUint64BigEndian(pId), true +} + +// SetProposerAndPayloadIDs sets the proposer and payload IDs for the given slot. +func (f *ProposerPayloadIDsCache) SetProposerAndPayloadIDs(slot types.Slot, vId types.ValidatorIndex, pId uint64) { + f.Lock() + defer f.Unlock() + var vIdBytes [vIdLength]byte + copy(vIdBytes[:], bytesutil.Uint64ToBytesBigEndian(uint64(vId))) + var pIdBytes [pIdLength]byte + copy(pIdBytes[:], bytesutil.Uint64ToBytesBigEndian(pId)) + + var bytes [vpIdsLength]byte + copy(bytes[:], append(vIdBytes[:], pIdBytes[:]...)) + + _, ok := f.slotToProposerAndPayloadIDs[slot] + // Ok to overwrite if the slot is already set but the payload ID is not set. + // This combats the re-org case where payload assignment could change the epoch of. + if !ok || (ok && pId != 0) { + f.slotToProposerAndPayloadIDs[slot] = bytes + } +} diff --git a/beacon-chain/cache/payload_id_test.go b/beacon-chain/cache/payload_id_test.go new file mode 100644 index 000000000000..b9711d100942 --- /dev/null +++ b/beacon-chain/cache/payload_id_test.go @@ -0,0 +1,34 @@ +package cache + +import ( + "testing" + + types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/testing/require" +) + +func TestValidatorPayloadIDsCache_GetAndSaveValidatorPayloadIDs(t *testing.T) { + cache := NewProposerPayloadIDsCache() + i, p, ok := cache.GetProposerPayloadIDs(0) + require.Equal(t, false, ok) + require.Equal(t, types.ValidatorIndex(0), i) + require.Equal(t, uint64(0), p) + + slot := types.Slot(1234) + vid := types.ValidatorIndex(34234324) + pid := uint64(234234) + cache.SetProposerAndPayloadIDs(slot, vid, pid) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, pid, p) + + slot = types.Slot(9456456) + vid = types.ValidatorIndex(6786745) + pid = uint64(87687) + cache.SetProposerAndPayloadIDs(slot, vid, pid) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, pid, p) +} diff --git a/beacon-chain/core/helpers/beacon_committee.go b/beacon-chain/core/helpers/beacon_committee.go index d77d0f5725b9..894d4ea80c75 100644 --- a/beacon-chain/core/helpers/beacon_committee.go +++ b/beacon-chain/core/helpers/beacon_committee.go @@ -175,9 +175,7 @@ func CommitteeAssignments( return nil, nil, err } proposerIndexToSlots := make(map[types.ValidatorIndex][]types.Slot, params.BeaconConfig().SlotsPerEpoch) - // Proposal epochs do not have a look ahead, so we skip them over here. - validProposalEpoch := epoch < nextEpoch - for slot := startSlot; slot < startSlot+params.BeaconConfig().SlotsPerEpoch && validProposalEpoch; slot++ { + for slot := startSlot; slot < startSlot+params.BeaconConfig().SlotsPerEpoch; slot++ { // Skip proposer assignment for genesis slot. if slot == 0 { continue @@ -192,6 +190,15 @@ func CommitteeAssignments( proposerIndexToSlots[i] = append(proposerIndexToSlots[i], slot) } + // If previous proposer indices computation is outside if current proposal epoch range, + // we need to reset state slot back to start slot so that we can compute the correct committees. + currentProposalEpoch := epoch < nextEpoch + if !currentProposalEpoch { + if err := state.SetSlot(state.Slot() - params.BeaconConfig().SlotsPerEpoch); err != nil { + return nil, nil, err + } + } + activeValidatorIndices, err := ActiveValidatorIndices(ctx, state, epoch) if err != nil { return nil, nil, err diff --git a/beacon-chain/core/helpers/beacon_committee_test.go b/beacon-chain/core/helpers/beacon_committee_test.go index a7bb6d286f69..14030c969fe9 100644 --- a/beacon-chain/core/helpers/beacon_committee_test.go +++ b/beacon-chain/core/helpers/beacon_committee_test.go @@ -232,7 +232,7 @@ func TestCommitteeAssignments_CannotRetrieveFuture(t *testing.T) { _, proposerIndxs, err = CommitteeAssignments(context.Background(), state, time.CurrentEpoch(state)+1) require.NoError(t, err) - require.Equal(t, 0, len(proposerIndxs), "wanted empty proposer index set") + require.NotEqual(t, 0, len(proposerIndxs), "wanted non-zero proposer index set") } func TestCommitteeAssignments_EverySlotHasMin1Proposer(t *testing.T) { diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index 54fedc626b8c..eece8797176e 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//api/gateway:go_default_library", "//async/event:go_default_library", "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/kv:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index bf5ec0969109..4e6ba1ef9a98 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -21,6 +21,7 @@ import ( apigateway "github.com/prysmaticlabs/prysm/api/gateway" "github.com/prysmaticlabs/prysm/async/event" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" @@ -94,6 +95,7 @@ type BeaconNode struct { slashingsPool slashings.PoolManager syncCommitteePool synccommittee.Pool depositCache *depositcache.DepositCache + proposerIdsCache *cache.ProposerPayloadIDsCache stateFeed *event.Feed blockFeed *event.Feed opFeed *event.Feed @@ -152,6 +154,7 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) { slasherBlockHeadersFeed: new(event.Feed), slasherAttestationsFeed: new(event.Feed), serviceFlagOpts: &serviceFlagOpts{}, + proposerIdsCache: cache.NewProposerPayloadIDsCache(), } for _, opt := range opts { @@ -585,6 +588,7 @@ func (b *BeaconNode) registerBlockchainService() error { blockchain.WithStateGen(b.stateGen), blockchain.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp), + blockchain.WithProposerIdsCache(b.proposerIdsCache), ) blockchainService, err := blockchain.NewService(b.ctx, opts...) if err != nil { @@ -801,6 +805,7 @@ func (b *BeaconNode) registerRPCService() error { StateGen: b.stateGen, EnableDebugRPCEndpoints: enableDebugRPCEndpoints, MaxMsgSize: maxMsgSize, + ProposerIdsCache: b.proposerIdsCache, ExecutionEngineCaller: web3Service, }) diff --git a/beacon-chain/rpc/eth/validator/validator_test.go b/beacon-chain/rpc/eth/validator/validator_test.go index c65ac8ae53fa..41d863adfb04 100644 --- a/beacon-chain/rpc/eth/validator/validator_test.go +++ b/beacon-chain/rpc/eth/validator/validator_test.go @@ -1024,19 +1024,20 @@ func TestProduceBlockV2(t *testing.T) { TotalDifficulty: "0x1", }, }, - TimeFetcher: &mockChain.ChainService{}, - HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - BlockReceiver: &mockChain.ChainService{}, - ChainStartFetcher: &mockPOW.POWChain{}, - Eth1InfoFetcher: &mockPOW.POWChain{}, - Eth1BlockFetcher: &mockPOW.POWChain{}, - MockEth1Votes: true, - AttPool: attestations.NewPool(), - SlashingsPool: slashings.NewPool(), - ExitPool: voluntaryexits.NewPool(), - StateGen: stategen.New(db), - SyncCommitteePool: synccommittee.NewStore(), + TimeFetcher: &mockChain.ChainService{}, + HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + BlockReceiver: &mockChain.ChainService{}, + ChainStartFetcher: &mockPOW.POWChain{}, + Eth1InfoFetcher: &mockPOW.POWChain{}, + Eth1BlockFetcher: &mockPOW.POWChain{}, + MockEth1Votes: true, + AttPool: attestations.NewPool(), + SlashingsPool: slashings.NewPool(), + ExitPool: voluntaryexits.NewPool(), + StateGen: stategen.New(db), + SyncCommitteePool: synccommittee.NewStore(), + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } proposerSlashings := make([]*ethpbalpha.ProposerSlashing, params.BeaconConfig().MaxProposerSlashings) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel b/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel index a323f86b6c32..0cfe1e528b43 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel @@ -79,6 +79,8 @@ go_library( "@com_github_ferranbt_fastssz//:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prysmaticlabs_eth2_types//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go index 228ffef8d0bc..d41e7dabf434 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go @@ -138,7 +138,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. return nil, status.Errorf(codes.Internal, "Could not compute committee assignments: %v", err) } // Query the next epoch assignments for committee subnet subscriptions. - nextCommitteeAssignments, _, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1) + nextCommitteeAssignments, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1) if err != nil { return nil, status.Errorf(codes.Internal, "Could not compute next committee assignments: %v", err) } @@ -180,6 +180,14 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. nextAssignment.AttesterSlot = ca.AttesterSlot nextAssignment.CommitteeIndex = ca.CommitteeIndex } + // Cache proposer assignment for the current epoch. + for _, slot := range proposerIndexToSlots[idx] { + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, 0 /* payloadID */) + } + // Cache proposer assignment for the next epoch. + for _, slot := range nextProposerIndexToSlots[idx] { + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, 0 /* payloadID */) + } } else { // If the validator isn't in the beacon state, try finding their deposit to determine their status. vStatus, _ := vs.validatorStatus(ctx, s, pubKey) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go index b6ecb54d205c..6d1665518c39 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go @@ -60,9 +60,10 @@ func TestGetDuties_OK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -144,10 +145,11 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockPOW.POWChain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockPOW.POWChain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -181,12 +183,12 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { res, err = vs.GetDuties(context.Background(), req) require.NoError(t, err, "Could not call epoch committee assignment") for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.Equal(t, types.ValidatorIndex(i), res.CurrentEpochDuties[i].ValidatorIndex) + require.Equal(t, types.ValidatorIndex(i), res.CurrentEpochDuties[i].ValidatorIndex) } for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.Equal(t, true, res.CurrentEpochDuties[i].IsSyncCommittee) + require.Equal(t, true, res.CurrentEpochDuties[i].IsSyncCommittee) // Current epoch and next epoch duties should be equal before the sync period epoch boundary. - assert.Equal(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) + require.Equal(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) } // Current epoch and next epoch duties should not be equal at the sync period epoch boundary. @@ -197,7 +199,7 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { res, err = vs.GetDuties(context.Background(), req) require.NoError(t, err, "Could not call epoch committee assignment") for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) + require.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) } } @@ -249,10 +251,11 @@ func TestGetBellatrixDuties_SyncCommitteeOK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockPOW.POWChain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockPOW.POWChain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -302,7 +305,7 @@ func TestGetBellatrixDuties_SyncCommitteeOK(t *testing.T) { res, err = vs.GetDuties(context.Background(), req) require.NoError(t, err, "Could not call epoch committee assignment") for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) + require.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) } } @@ -399,9 +402,10 @@ func TestGetDuties_CurrentEpoch_ShouldNotFail(t *testing.T) { State: bState, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -437,9 +441,10 @@ func TestGetDuties_MultipleKeys_OK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } pubkey0 := deposits[0].Data.PublicKey @@ -503,11 +508,12 @@ func TestStreamDuties_OK(t *testing.T) { Genesis: time.Now(), } vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, + Ctx: ctx, + HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + TimeFetcher: c, + StateNotifier: &mockChain.MockStateNotifier{}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -560,11 +566,12 @@ func TestStreamDuties_OK_ChainReorg(t *testing.T) { Genesis: time.Now(), } vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, + Ctx: ctx, + HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + TimeFetcher: c, + StateNotifier: &mockChain.MockStateNotifier{}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go index e88ee7ea7336..8bd71dc65e66 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go @@ -9,6 +9,8 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/holiman/uint256" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -24,9 +26,31 @@ import ( "github.com/sirupsen/logrus" ) +var ( + // payloadIDCacheMiss tracks the number of payload ID requests that aren't present in the cache. + payloadIDCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ + Name: "payload_id_cache_miss", + Help: "The number of payload id get requests that aren't present in the cache.", + }) + // payloadIDCacheHit tracks the number of payload ID requests that are present in the cache. + payloadIDCacheHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "payload_id_cache_hit", + Help: "The number of payload id get requests that are present in the cache.", + }) +) + // This returns the execution payload of a given slot. The function has full awareness of pre and post merge. // The payload is computed given the respected time of merge. func (vs *Server) getExecutionPayload(ctx context.Context, slot types.Slot, vIdx types.ValidatorIndex) (*enginev1.ExecutionPayload, error) { + proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot) + if ok && proposerID == vIdx && payloadId != 0 { // Payload ID is cache hit. Return the cached payload ID. + var vIdBytes [8]byte + copy(vIdBytes[:], bytesutil.Uint64ToBytesBigEndian(payloadId)) + payloadIDCacheHit.Inc() + return vs.ExecutionEngineCaller.GetPayload(ctx, vIdBytes) + } + payloadIDCacheMiss.Inc() + st, err := vs.HeadFetcher.HeadState(ctx) if err != nil { return nil, err diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go index 90f26f8afd05..57319d3ac0d5 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go @@ -9,6 +9,7 @@ import ( "github.com/holiman/uint256" types "github.com/prysmaticlabs/eth2-types" chainMock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" powtesting "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/state" @@ -106,6 +107,11 @@ func TestServer_getExecutionPayload(t *testing.T) { payloadID: &pb.PayloadIDBytes{0x1}, validatorIndx: 1, }, + { + name: "transition completed, happy case, payload ID cached)", + st: transitionSt, + validatorIndx: 100, + }, { name: "transition completed, could not prepare payload", st: transitionSt, @@ -132,10 +138,12 @@ func TestServer_getExecutionPayload(t *testing.T) { params.OverrideBeaconConfig(cfg) vs := &Server{ - ExecutionEngineCaller: &powtesting.EngineClient{PayloadIDBytes: tt.payloadID, ErrForkchoiceUpdated: tt.forkchoiceErr}, - HeadFetcher: &chainMock.ChainService{State: tt.st}, - BeaconDB: beaconDB, + ExecutionEngineCaller: &powtesting.EngineClient{PayloadIDBytes: tt.payloadID, ErrForkchoiceUpdated: tt.forkchoiceErr}, + HeadFetcher: &chainMock.ChainService{State: tt.st}, + BeaconDB: beaconDB, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(tt.st.Slot(), 100, 100) _, err := vs.getExecutionPayload(context.Background(), tt.st.Slot(), tt.validatorIndx) if tt.errString != "" { require.ErrorContains(t, tt.errString, err) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index 66d8c7b86bad..72f4ec671b44 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -10,6 +10,7 @@ import ( types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/go-bitfield" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -2344,7 +2345,8 @@ func TestProposer_GetBeaconBlock_BellatrixEpoch(t *testing.T) { PayloadIDBytes: &enginev1.PayloadIDBytes{1}, ExecutionPayload: payload, }, - BeaconDB: db, + BeaconDB: db, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } randaoReveal, err := util.RandaoReveal(beaconState, 0, privKeys) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go index 5678d98ffe71..65af19832576 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go @@ -41,6 +41,7 @@ import ( type Server struct { Ctx context.Context AttestationCache *cache.AttestationCache + ProposerSlotIndexCache *cache.ProposerPayloadIDsCache HeadFetcher blockchain.HeadFetcher ForkFetcher blockchain.ForkFetcher FinalizationFetcher blockchain.FinalizationFetcher diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 78668fa2dc5d..6f7697d72f56 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -109,6 +109,7 @@ type Config struct { StateGen *stategen.State MaxMsgSize int ExecutionEngineCaller powchain.EngineCaller + ProposerIdsCache *cache.ProposerPayloadIDsCache } // NewService instantiates a new RPC service instance that will @@ -207,6 +208,7 @@ func (s *Service) Start() { ReplayerBuilder: ch, ExecutionEngineCaller: s.cfg.ExecutionEngineCaller, BeaconDB: s.cfg.BeaconDB, + ProposerSlotIndexCache: s.cfg.ProposerIdsCache, } validatorServerV1 := &validator.Server{ HeadFetcher: s.cfg.HeadFetcher, diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 9dc4c648997f..0494534f002b 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -153,13 +153,15 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { } if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil { - log.Debugf("Could not validate block from slot %d: %v", b.Block().Slot(), err) - s.setBadBlock(ctx, blkRoot) - tracing.AnnotateError(span, err) - // In the next iteration of the queue, this block will be removed from - // the pending queue as it has been marked as a 'bad' block. - span.End() - continue + if !errors.Is(ErrOptimisticParent, err) { + log.Debugf("Could not validate block from slot %d: %v", b.Block().Slot(), err) + s.setBadBlock(ctx, blkRoot) + tracing.AnnotateError(span, err) + // In the next iteration of the queue, this block will be removed from + // the pending queue as it has been marked as a 'bad' block. + span.End() + continue + } } if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { diff --git a/testing/spectest/shared/common/forkchoice/BUILD.bazel b/testing/spectest/shared/common/forkchoice/BUILD.bazel index 5aecfec048f7..07e5d574cb3e 100644 --- a/testing/spectest/shared/common/forkchoice/BUILD.bazel +++ b/testing/spectest/shared/common/forkchoice/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//beacon-chain/blockchain:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/db/testing:go_default_library", diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index 8e5b10167020..10064dc4c5e2 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" coreTime "github.com/prysmaticlabs/prysm/beacon-chain/core/time" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -57,6 +58,7 @@ func startChainService(t *testing.T, st state.BeaconState, block block.SignedBea blockchain.WithStateNotifier(&mock.MockStateNotifier{}), blockchain.WithAttestationPool(attestations.NewPool()), blockchain.WithDepositCache(depositCache), + blockchain.WithProposerIdsCache(cache.NewProposerPayloadIDsCache())), ) service, err := blockchain.NewService(context.Background(), opts...) require.NoError(t, err) From a9fcd92485db59d7968a3a6aa1704c9eda227256 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 4 Apr 2022 10:12:43 -0700 Subject: [PATCH 2/8] Update BUILD.bazel --- beacon-chain/cache/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index ab93b475468c..4f74800ec65e 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -27,6 +27,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache", visibility = [ "//beacon-chain:__subpackages__", + "//testing/spectest:__subpackages__", "//tools:__subpackages__", ], deps = [ From 42f69523d53217f796a781d0a6122283c33f8757 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 4 Apr 2022 10:16:25 -0700 Subject: [PATCH 3/8] Update service.go --- testing/spectest/shared/common/forkchoice/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index 10064dc4c5e2..f246fb0a54ee 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -58,7 +58,7 @@ func startChainService(t *testing.T, st state.BeaconState, block block.SignedBea blockchain.WithStateNotifier(&mock.MockStateNotifier{}), blockchain.WithAttestationPool(attestations.NewPool()), blockchain.WithDepositCache(depositCache), - blockchain.WithProposerIdsCache(cache.NewProposerPayloadIDsCache())), + blockchain.WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), ) service, err := blockchain.NewService(context.Background(), opts...) require.NoError(t, err) From 0b5bdcb065b9f0b88f0773aa887a91423885cb37 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 4 Apr 2022 11:05:36 -0700 Subject: [PATCH 4/8] Another option, remove optimistic sync assumption --- beacon-chain/sync/pending_blocks_queue.go | 16 +++++++-------- beacon-chain/sync/validate_beacon_blocks.go | 22 +-------------------- 2 files changed, 8 insertions(+), 30 deletions(-) diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 0494534f002b..9dc4c648997f 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -153,15 +153,13 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { } if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil { - if !errors.Is(ErrOptimisticParent, err) { - log.Debugf("Could not validate block from slot %d: %v", b.Block().Slot(), err) - s.setBadBlock(ctx, blkRoot) - tracing.AnnotateError(span, err) - // In the next iteration of the queue, this block will be removed from - // the pending queue as it has been marked as a 'bad' block. - span.End() - continue - } + log.Debugf("Could not validate block from slot %d: %v", b.Block().Slot(), err) + s.setBadBlock(ctx, blkRoot) + tracing.AnnotateError(span, err) + // In the next iteration of the queue, this block will be removed from + // the pending queue as it has been marked as a 'bad' block. + span.End() + continue } if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 98fbe2c294a5..54ee97ecd405 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -26,10 +26,6 @@ import ( "go.opencensus.io/trace" ) -var ( - ErrOptimisticParent = errors.New("parent of the block is optimistic") -) - // validateBeaconBlockPubSub checks that the incoming block has a valid BLS signature. // Blocks that have already been seen are ignored. If the BLS signature is any valid signature, // this method rebroadcasts the message. @@ -166,11 +162,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms err = s.validateBeaconBlock(ctx, blk, blockRoot) if err != nil { - // If the parent is optimistic, process the block as usual - // This also does not penalize a peer which sends optimistic blocks - if !errors.Is(ErrOptimisticParent, err) { - return pubsub.ValidationReject, err - } + return pubsub.ValidationReject, err } // Record attribute of valid block. @@ -230,10 +222,6 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk block.SignedBeaco } if err = s.validateBellatrixBeaconBlock(ctx, parentState, blk.Block()); err != nil { - if errors.Is(err, ErrOptimisticParent) { - return err - } - // for other kinds of errors, set this block as a bad block. s.setBadBlock(ctx, blockRoot) return err } @@ -282,14 +270,6 @@ func (s *Service) validateBellatrixBeaconBlock(ctx context.Context, parentState return errors.New("incorrect timestamp") } - parentRoot := bytesutil.ToBytes32(blk.ParentRoot()) - isParentOptimistic, err := s.cfg.chain.IsOptimisticForRoot(ctx, parentRoot) - if err != nil { - return err - } - if isParentOptimistic { - return ErrOptimisticParent - } return nil } From 08000250051c15eee58d5da237e7711b74769daf Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 4 Apr 2022 11:07:04 -0700 Subject: [PATCH 5/8] Revert "Another option, remove optimistic sync assumption" This reverts commit 0b5bdcb065b9f0b88f0773aa887a91423885cb37. --- beacon-chain/sync/pending_blocks_queue.go | 16 ++++++++------- beacon-chain/sync/validate_beacon_blocks.go | 22 ++++++++++++++++++++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 9dc4c648997f..0494534f002b 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -153,13 +153,15 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { } if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil { - log.Debugf("Could not validate block from slot %d: %v", b.Block().Slot(), err) - s.setBadBlock(ctx, blkRoot) - tracing.AnnotateError(span, err) - // In the next iteration of the queue, this block will be removed from - // the pending queue as it has been marked as a 'bad' block. - span.End() - continue + if !errors.Is(ErrOptimisticParent, err) { + log.Debugf("Could not validate block from slot %d: %v", b.Block().Slot(), err) + s.setBadBlock(ctx, blkRoot) + tracing.AnnotateError(span, err) + // In the next iteration of the queue, this block will be removed from + // the pending queue as it has been marked as a 'bad' block. + span.End() + continue + } } if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 54ee97ecd405..98fbe2c294a5 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -26,6 +26,10 @@ import ( "go.opencensus.io/trace" ) +var ( + ErrOptimisticParent = errors.New("parent of the block is optimistic") +) + // validateBeaconBlockPubSub checks that the incoming block has a valid BLS signature. // Blocks that have already been seen are ignored. If the BLS signature is any valid signature, // this method rebroadcasts the message. @@ -162,7 +166,11 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms err = s.validateBeaconBlock(ctx, blk, blockRoot) if err != nil { - return pubsub.ValidationReject, err + // If the parent is optimistic, process the block as usual + // This also does not penalize a peer which sends optimistic blocks + if !errors.Is(ErrOptimisticParent, err) { + return pubsub.ValidationReject, err + } } // Record attribute of valid block. @@ -222,6 +230,10 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk block.SignedBeaco } if err = s.validateBellatrixBeaconBlock(ctx, parentState, blk.Block()); err != nil { + if errors.Is(err, ErrOptimisticParent) { + return err + } + // for other kinds of errors, set this block as a bad block. s.setBadBlock(ctx, blockRoot) return err } @@ -270,6 +282,14 @@ func (s *Service) validateBellatrixBeaconBlock(ctx context.Context, parentState return errors.New("incorrect timestamp") } + parentRoot := bytesutil.ToBytes32(blk.ParentRoot()) + isParentOptimistic, err := s.cfg.chain.IsOptimisticForRoot(ctx, parentRoot) + if err != nil { + return err + } + if isParentOptimistic { + return ErrOptimisticParent + } return nil } From 9a3002d538d397a141111d8b3be3d3570cbe8817 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Tue, 5 Apr 2022 10:22:44 -0700 Subject: [PATCH 6/8] feedbacks --- beacon-chain/blockchain/optimistic_sync.go | 14 ++++---- beacon-chain/cache/payload_id.go | 32 +++++++++++++------ beacon-chain/cache/payload_id_test.go | 32 +++++++++++++++++-- .../prysm/v1alpha1/validator/assignments.go | 6 ++-- .../validator/proposer_execution_payload.go | 8 ++--- 5 files changed, 68 insertions(+), 24 deletions(-) diff --git a/beacon-chain/blockchain/optimistic_sync.go b/beacon-chain/blockchain/optimistic_sync.go index 9c1b203c2b51..608c9cd649fa 100644 --- a/beacon-chain/blockchain/optimistic_sync.go +++ b/beacon-chain/blockchain/optimistic_sync.go @@ -91,7 +91,9 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.Beac return nil, errors.Wrap(err, "could not set block to valid") } if hasAttr { // If the forkchoice update call has an attribute, update the proposer payload ID cache. - s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(nextSlot, proposerId, bytesutil.BytesToUint64BigEndian(payloadID[:])) + var pId [8]byte + copy(pId[:], payloadID[:]) + s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(nextSlot, proposerId, pId) } return payloadID, nil } @@ -206,18 +208,18 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, // Get fee recipient. feeRecipient := params.BeaconConfig().DefaultFeeRecipient recipient, err := s.cfg.BeaconDB.FeeRecipientByValidatorID(ctx, proposerID) - switch err == nil { - case true: - feeRecipient = recipient - case errors.As(err, kv.ErrNotFoundFeeRecipient): + switch { + case errors.Is(err, kv.ErrNotFoundFeeRecipient): if feeRecipient.String() == fieldparams.EthBurnAddressHex { logrus.WithFields(logrus.Fields{ "validatorIndex": proposerID, "burnAddress": fieldparams.EthBurnAddressHex, }).Error("Fee recipient not set. Using burn address") } - default: + case err != nil: return false, nil, 0, errors.Wrap(err, "could not get fee recipient in db") + default: + feeRecipient = recipient } // Get timestamp. diff --git a/beacon-chain/cache/payload_id.go b/beacon-chain/cache/payload_id.go index 1f2c2b4f3951..1f410595e7b0 100644 --- a/beacon-chain/cache/payload_id.go +++ b/beacon-chain/cache/payload_id.go @@ -26,34 +26,48 @@ func NewProposerPayloadIDsCache() *ProposerPayloadIDsCache { } // GetProposerPayloadIDs returns the proposer and payload IDs for the given slot. -func (f *ProposerPayloadIDsCache) GetProposerPayloadIDs(slot types.Slot) (types.ValidatorIndex, uint64, bool) { +func (f *ProposerPayloadIDsCache) GetProposerPayloadIDs(slot types.Slot) (types.ValidatorIndex, [8]byte, bool) { f.RLock() defer f.RUnlock() ids, ok := f.slotToProposerAndPayloadIDs[slot] if !ok { - return 0, 0, false + return 0, [8]byte{}, false } vId := ids[:vIdLength] - pId := ids[vIdLength:] - return types.ValidatorIndex(bytesutil.BytesToUint64BigEndian(vId)), bytesutil.BytesToUint64BigEndian(pId), true + + b := ids[vIdLength:] + var pId [pIdLength]byte + copy(pId[:], b) + + return types.ValidatorIndex(bytesutil.BytesToUint64BigEndian(vId)), pId, true } // SetProposerAndPayloadIDs sets the proposer and payload IDs for the given slot. -func (f *ProposerPayloadIDsCache) SetProposerAndPayloadIDs(slot types.Slot, vId types.ValidatorIndex, pId uint64) { +func (f *ProposerPayloadIDsCache) SetProposerAndPayloadIDs(slot types.Slot, vId types.ValidatorIndex, pId [8]byte) { f.Lock() defer f.Unlock() var vIdBytes [vIdLength]byte copy(vIdBytes[:], bytesutil.Uint64ToBytesBigEndian(uint64(vId))) - var pIdBytes [pIdLength]byte - copy(pIdBytes[:], bytesutil.Uint64ToBytesBigEndian(pId)) var bytes [vpIdsLength]byte - copy(bytes[:], append(vIdBytes[:], pIdBytes[:]...)) + copy(bytes[:], append(vIdBytes[:], pId[:]...)) _, ok := f.slotToProposerAndPayloadIDs[slot] // Ok to overwrite if the slot is already set but the payload ID is not set. // This combats the re-org case where payload assignment could change the epoch of. - if !ok || (ok && pId != 0) { + if !ok || (ok && pId != [pIdLength]byte{}) { f.slotToProposerAndPayloadIDs[slot] = bytes } } + +// PrunePayloadIDs removes the payload id entries that's current than input slot. +func (f *ProposerPayloadIDsCache) PrunePayloadIDs(slot types.Slot) { + f.Lock() + defer f.Unlock() + + for s := range f.slotToProposerAndPayloadIDs { + if slot > s { + delete(f.slotToProposerAndPayloadIDs, s) + } + } +} diff --git a/beacon-chain/cache/payload_id_test.go b/beacon-chain/cache/payload_id_test.go index b9711d100942..71573409e528 100644 --- a/beacon-chain/cache/payload_id_test.go +++ b/beacon-chain/cache/payload_id_test.go @@ -12,11 +12,11 @@ func TestValidatorPayloadIDsCache_GetAndSaveValidatorPayloadIDs(t *testing.T) { i, p, ok := cache.GetProposerPayloadIDs(0) require.Equal(t, false, ok) require.Equal(t, types.ValidatorIndex(0), i) - require.Equal(t, uint64(0), p) + require.Equal(t, [pIdLength]byte{}, p) slot := types.Slot(1234) vid := types.ValidatorIndex(34234324) - pid := uint64(234234) + pid := [8]byte{1, 2, 3, 3, 7, 8, 7, 8} cache.SetProposerAndPayloadIDs(slot, vid, pid) i, p, ok = cache.GetProposerPayloadIDs(slot) require.Equal(t, true, ok) @@ -25,10 +25,36 @@ func TestValidatorPayloadIDsCache_GetAndSaveValidatorPayloadIDs(t *testing.T) { slot = types.Slot(9456456) vid = types.ValidatorIndex(6786745) - pid = uint64(87687) + cache.SetProposerAndPayloadIDs(slot, vid, [pIdLength]byte{}) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, [pIdLength]byte{}, p) + + // reset cache without pid + slot = types.Slot(9456456) + vid = types.ValidatorIndex(11111) + pid = [8]byte{3, 2, 3, 33, 72, 8, 7, 8} cache.SetProposerAndPayloadIDs(slot, vid, pid) i, p, ok = cache.GetProposerPayloadIDs(slot) require.Equal(t, true, ok) require.Equal(t, vid, i) require.Equal(t, pid, p) + + // reset cache with existing pid + slot = types.Slot(9456456) + vid = types.ValidatorIndex(11111) + newPid := [8]byte{1, 2, 3, 33, 72, 8, 7, 1} + cache.SetProposerAndPayloadIDs(slot, vid, newPid) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, newPid, p) + + // remove cache entry + cache.PrunePayloadIDs(slot + 1) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, false, ok) + require.Equal(t, types.ValidatorIndex(0), i) + require.Equal(t, [pIdLength]byte{}, p) } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go index d41e7dabf434..a37310e90cc5 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go @@ -182,12 +182,14 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. } // Cache proposer assignment for the current epoch. for _, slot := range proposerIndexToSlots[idx] { - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, 0 /* payloadID */) + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, [8]byte{} /* payloadID */) } // Cache proposer assignment for the next epoch. for _, slot := range nextProposerIndexToSlots[idx] { - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, 0 /* payloadID */) + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, [8]byte{} /* payloadID */) } + // Prune payload ID cache for any slots before request slot. + vs.ProposerSlotIndexCache.PrunePayloadIDs(epochStartSlot) } else { // If the validator isn't in the beacon state, try finding their deposit to determine their status. vStatus, _ := vs.validatorStatus(ctx, s, pubKey) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go index 8bd71dc65e66..684be5286083 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go @@ -43,11 +43,11 @@ var ( // The payload is computed given the respected time of merge. func (vs *Server) getExecutionPayload(ctx context.Context, slot types.Slot, vIdx types.ValidatorIndex) (*enginev1.ExecutionPayload, error) { proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot) - if ok && proposerID == vIdx && payloadId != 0 { // Payload ID is cache hit. Return the cached payload ID. - var vIdBytes [8]byte - copy(vIdBytes[:], bytesutil.Uint64ToBytesBigEndian(payloadId)) + if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID. + var pid [8]byte + copy(pid[:], payloadId[:]) payloadIDCacheHit.Inc() - return vs.ExecutionEngineCaller.GetPayload(ctx, vIdBytes) + return vs.ExecutionEngineCaller.GetPayload(ctx, pid) } payloadIDCacheMiss.Inc() From 32a4103785cd677dc4f47de07736c0e4e02fe5ed Mon Sep 17 00:00:00 2001 From: terence tsao Date: Tue, 5 Apr 2022 10:30:00 -0700 Subject: [PATCH 7/8] Fix tests --- beacon-chain/blockchain/optimistic_sync_test.go | 4 ++-- beacon-chain/blockchain/receive_attestation_test.go | 4 ++-- .../rpc/prysm/v1alpha1/validator/assignments_test.go | 11 ++++++----- .../validator/proposer_execution_payload_test.go | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/beacon-chain/blockchain/optimistic_sync_test.go b/beacon-chain/blockchain/optimistic_sync_test.go index 92f18014bef5..567f426fd0e9 100644 --- a/beacon-chain/blockchain/optimistic_sync_test.go +++ b/beacon-chain/blockchain/optimistic_sync_test.go @@ -602,7 +602,7 @@ func Test_GetPayloadAttribute(t *testing.T) { // Cache hit, advance state, no fee recipient suggestedVid := types.ValidatorIndex(1) slot := types.Slot(1) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, 0) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}) st, _ := util.DeterministicGenesisState(t, 1) hook := logTest.NewGlobal() hasPayload, attr, vId, err := service.getPayloadAttribute(ctx, st, slot) @@ -615,7 +615,7 @@ func Test_GetPayloadAttribute(t *testing.T) { // Cache hit, advance state, has fee recipient suggestedAddr := common.HexToAddress("123") require.NoError(t, service.cfg.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, []types.ValidatorIndex{suggestedVid}, []common.Address{suggestedAddr})) - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, 0) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}) hasPayload, attr, vId, err = service.getPayloadAttribute(ctx, st, slot) require.NoError(t, err) require.Equal(t, true, hasPayload) diff --git a/beacon-chain/blockchain/receive_attestation_test.go b/beacon-chain/blockchain/receive_attestation_test.go index 3bfc3179d911..de0a2c1ade1d 100644 --- a/beacon-chain/blockchain/receive_attestation_test.go +++ b/beacon-chain/blockchain/receive_attestation_test.go @@ -159,7 +159,7 @@ func TestNotifyEngineIfChangedHead(t *testing.T) { block: wsb, state: st, } - service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, 1) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}) service.store.SetFinalizedCheckpt(finalized) service.notifyEngineIfChangedHead(ctx, [32]byte{'b'}) require.LogsDoNotContain(t, hook, finalizedErr) @@ -167,5 +167,5 @@ func TestNotifyEngineIfChangedHead(t *testing.T) { vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2) require.Equal(t, true, has) require.Equal(t, types.ValidatorIndex(1), vId) - require.Equal(t, uint64(1), payloadID) + require.Equal(t, [8]byte{1}, payloadID) } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go index 6d1665518c39..f575ea6e879b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go @@ -343,11 +343,12 @@ func TestGetAltairDuties_UnknownPubkey(t *testing.T) { require.NoError(t, err) vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockPOW.POWChain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - DepositFetcher: depositCache, + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockPOW.POWChain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + DepositFetcher: depositCache, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } unknownPubkey := bytesutil.PadTo([]byte{'u'}, 48) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go index 57319d3ac0d5..f849df1cfd7b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go @@ -143,7 +143,7 @@ func TestServer_getExecutionPayload(t *testing.T) { BeaconDB: beaconDB, ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } - vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(tt.st.Slot(), 100, 100) + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(tt.st.Slot(), 100, [8]byte{100}) _, err := vs.getExecutionPayload(context.Background(), tt.st.Slot(), tt.validatorIndx) if tt.errString != "" { require.ErrorContains(t, tt.errString, err) From da347e4eff67d245550f651ff6d4b25a5df5b20d Mon Sep 17 00:00:00 2001 From: terence tsao Date: Wed, 6 Apr 2022 10:38:02 -0700 Subject: [PATCH 8/8] Refactor state input --- .../blockchain/chain_info_norace_test.go | 15 ++++++++---- beacon-chain/blockchain/head.go | 24 +++++++++---------- beacon-chain/blockchain/head_test.go | 10 ++++---- beacon-chain/blockchain/optimistic_sync.go | 4 ++-- .../blockchain/optimistic_sync_test.go | 3 ++- beacon-chain/blockchain/process_block.go | 10 +++++--- .../blockchain/receive_attestation.go | 8 ++++++- .../blockchain/service_norace_test.go | 5 ++-- 8 files changed, 49 insertions(+), 30 deletions(-) diff --git a/beacon-chain/blockchain/chain_info_norace_test.go b/beacon-chain/blockchain/chain_info_norace_test.go index 48de1808e096..a56367ba1194 100644 --- a/beacon-chain/blockchain/chain_info_norace_test.go +++ b/beacon-chain/blockchain/chain_info_norace_test.go @@ -19,10 +19,11 @@ func TestHeadSlot_DataRace(t *testing.T) { } b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) + st, _ := util.DeterministicGenesisState(t, 1) wait := make(chan struct{}) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) }() s.HeadSlot() <-wait @@ -37,9 +38,11 @@ func TestHeadRoot_DataRace(t *testing.T) { b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) wait := make(chan struct{}) + st, _ := util.DeterministicGenesisState(t, 1) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) + }() _, err = s.HeadRoot(context.Background()) require.NoError(t, err) @@ -57,9 +60,11 @@ func TestHeadBlock_DataRace(t *testing.T) { b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) wait := make(chan struct{}) + st, _ := util.DeterministicGenesisState(t, 1) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) + }() _, err = s.HeadBlock(context.Background()) require.NoError(t, err) @@ -74,9 +79,11 @@ func TestHeadState_DataRace(t *testing.T) { b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) wait := make(chan struct{}) + st, _ := util.DeterministicGenesisState(t, 1) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) + }() _, err = s.HeadState(context.Background()) require.NoError(t, err) diff --git a/beacon-chain/blockchain/head.go b/beacon-chain/blockchain/head.go index 5870c442b46e..44055f723528 100644 --- a/beacon-chain/blockchain/head.go +++ b/beacon-chain/blockchain/head.go @@ -44,7 +44,11 @@ func (s *Service) UpdateAndSaveHeadWithBalances(ctx context.Context) error { if err != nil { return err } - return s.saveHead(ctx, headRoot, headBlock) + headState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot) + if err != nil { + return errors.Wrap(err, "could not retrieve head state in DB") + } + return s.saveHead(ctx, headRoot, headBlock, headState) } // This defines the current chain service's view of head. @@ -101,7 +105,7 @@ func (s *Service) updateHead(ctx context.Context, balances []uint64) ([32]byte, // This saves head info to the local service cache, it also saves the // new head root to the DB. -func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock block.SignedBeaconBlock) error { +func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock block.SignedBeaconBlock, headState state.BeaconState) error { ctx, span := trace.StartSpan(ctx, "blockChain.saveHead") defer span.End() @@ -116,6 +120,9 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo if err := helpers.BeaconBlockIsNil(headBlock); err != nil { return err } + if headState == nil || headState.IsNil() { + return errors.New("cannot save nil head state") + } // If the head state is not available, just return nil. // There's nothing to cache @@ -123,15 +130,6 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo return nil } - // Get the new head state from cached state or DB. - newHeadState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot) - if err != nil { - return errors.Wrap(err, "could not retrieve head state in DB") - } - if newHeadState == nil || newHeadState.IsNil() { - return errors.New("cannot save nil head state") - } - // A chain re-org occurred, so we fire an event notifying the rest of the services. headSlot := s.HeadSlot() newHeadSlot := headBlock.Block().Slot() @@ -170,7 +168,7 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo } // Cache the new head info. - s.setHead(headRoot, headBlock, newHeadState) + s.setHead(headRoot, headBlock, headState) // Save the new head root to DB. if err := s.cfg.BeaconDB.SaveHeadBlockRoot(ctx, headRoot); err != nil { @@ -180,7 +178,7 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo // Forward an event capturing a new chain head over a common event feed // done in a goroutine to avoid blocking the critical runtime main routine. go func() { - if err := s.notifyNewHeadEvent(ctx, newHeadSlot, newHeadState, newStateRoot, headRoot[:]); err != nil { + if err := s.notifyNewHeadEvent(ctx, newHeadSlot, headState, newStateRoot, headRoot[:]); err != nil { log.WithError(err).Error("Could not notify event feed of new chain head") } }() diff --git a/beacon-chain/blockchain/head_test.go b/beacon-chain/blockchain/head_test.go index eee3031c7445..fb72596348b9 100644 --- a/beacon-chain/blockchain/head_test.go +++ b/beacon-chain/blockchain/head_test.go @@ -31,7 +31,8 @@ func TestSaveHead_Same(t *testing.T) { service.head = &head{slot: 0, root: r} b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) - require.NoError(t, service.saveHead(context.Background(), r, b)) + st, _ := util.DeterministicGenesisState(t, 1) + require.NoError(t, service.saveHead(context.Background(), r, b, st)) assert.Equal(t, types.Slot(0), service.headSlot(), "Head did not stay the same") assert.Equal(t, r, service.headRoot(), "Head did not stay the same") } @@ -69,7 +70,7 @@ func TestSaveHead_Different(t *testing.T) { require.NoError(t, headState.SetSlot(1)) require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), ðpb.StateSummary{Slot: 1, Root: newRoot[:]})) require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), headState, newRoot)) - require.NoError(t, service.saveHead(context.Background(), newRoot, wsb)) + require.NoError(t, service.saveHead(context.Background(), newRoot, wsb, headState)) assert.Equal(t, types.Slot(1), service.HeadSlot(), "Head did not change") @@ -115,7 +116,7 @@ func TestSaveHead_Different_Reorg(t *testing.T) { require.NoError(t, headState.SetSlot(1)) require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), ðpb.StateSummary{Slot: 1, Root: newRoot[:]})) require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), headState, newRoot)) - require.NoError(t, service.saveHead(context.Background(), newRoot, wsb)) + require.NoError(t, service.saveHead(context.Background(), newRoot, wsb, headState)) assert.Equal(t, types.Slot(1), service.HeadSlot(), "Head did not change") @@ -159,7 +160,8 @@ func TestUpdateHead_MissingJustifiedRoot(t *testing.T) { service.store.SetBestJustifiedCheckpt(ðpb.Checkpoint{}) headRoot, err := service.updateHead(context.Background(), []uint64{}) require.NoError(t, err) - require.NoError(t, service.saveHead(context.Background(), headRoot, wsb)) + st, _ := util.DeterministicGenesisState(t, 1) + require.NoError(t, service.saveHead(context.Background(), headRoot, wsb, st)) } func Test_notifyNewHeadEvent(t *testing.T) { diff --git a/beacon-chain/blockchain/optimistic_sync.go b/beacon-chain/blockchain/optimistic_sync.go index 5782abf6ade9..19f2510a4efe 100644 --- a/beacon-chain/blockchain/optimistic_sync.go +++ b/beacon-chain/blockchain/optimistic_sync.go @@ -27,7 +27,7 @@ import ( // notifyForkchoiceUpdate signals execution engine the fork choice updates. Execution engine should: // 1. Re-organizes the execution payload chain and corresponding state to make head_block_hash the head. // 2. Applies finality to the execution state: it irreversibly persists the chain of all execution payloads and corresponding state, up to and including finalized_block_hash. -func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.BeaconBlock, headRoot [32]byte, finalizedRoot [32]byte) (*enginev1.PayloadIDBytes, error) { +func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headState state.BeaconState, headBlk block.BeaconBlock, headRoot [32]byte, finalizedRoot [32]byte) (*enginev1.PayloadIDBytes, error) { ctx, span := trace.StartSpan(ctx, "blockChain.notifyForkchoiceUpdate") defer span.End() @@ -74,7 +74,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.Beac } nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer. - hasAttr, attr, proposerId, err := s.getPayloadAttribute(ctx, s.headState(ctx), nextSlot) + hasAttr, attr, proposerId, err := s.getPayloadAttribute(ctx, headState, nextSlot) if err != nil { return nil, errors.Wrap(err, "could not get payload attribute") } diff --git a/beacon-chain/blockchain/optimistic_sync_test.go b/beacon-chain/blockchain/optimistic_sync_test.go index 567f426fd0e9..80205934408d 100644 --- a/beacon-chain/blockchain/optimistic_sync_test.go +++ b/beacon-chain/blockchain/optimistic_sync_test.go @@ -166,7 +166,8 @@ func Test_NotifyForkchoiceUpdate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { service.cfg.ExecutionEngineCaller = &mockPOW.EngineClient{ErrForkchoiceUpdated: tt.newForkchoiceErr} - _, err := service.notifyForkchoiceUpdate(ctx, tt.blk, service.headRoot(), tt.finalizedRoot) + st, _ := util.DeterministicGenesisState(t, 1) + _, err := service.notifyForkchoiceUpdate(ctx, st, tt.blk, service.headRoot(), tt.finalizedRoot) if tt.errString != "" { require.ErrorContains(t, tt.errString, err) } else { diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 508060ef0532..bff01f197716 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -212,10 +212,14 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b if err != nil { return err } - if _, err := s.notifyForkchoiceUpdate(ctx, headBlock.Block(), headRoot, bytesutil.ToBytes32(finalized.Root)); err != nil { + headState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot) + if err != nil { + return err + } + if _, err := s.notifyForkchoiceUpdate(ctx, headState, headBlock.Block(), headRoot, bytesutil.ToBytes32(finalized.Root)); err != nil { return err } - if err := s.saveHead(ctx, headRoot, headBlock); err != nil { + if err := s.saveHead(ctx, headRoot, headBlock, headState); err != nil { return errors.Wrap(err, "could not save head") } @@ -428,7 +432,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlo } } - if _, err := s.notifyForkchoiceUpdate(ctx, b.Block(), blockRoots[i], bytesutil.ToBytes32(fCheckpoints[i].Root)); err != nil { + if _, err := s.notifyForkchoiceUpdate(ctx, preState, b.Block(), blockRoots[i], bytesutil.ToBytes32(fCheckpoints[i].Root)); err != nil { return nil, nil, err } } diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 6b9fd0705540..430ad2ffdb9a 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -186,7 +186,13 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32 log.WithError(err).Error("Could not get block from db") return } + headState, err := s.cfg.StateGen.StateByRoot(ctx, newHeadRoot) + if err != nil { + log.WithError(err).Error("Could not get state from db") + return + } _, err = s.notifyForkchoiceUpdate(s.ctx, + headState, newHeadBlock.Block(), newHeadRoot, bytesutil.ToBytes32(finalized.Root), @@ -194,7 +200,7 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32 if err != nil { log.WithError(err).Error("could not notify forkchoice update") } - if err := s.saveHead(ctx, newHeadRoot, newHeadBlock); err != nil { + if err := s.saveHead(ctx, newHeadRoot, newHeadBlock, headState); err != nil { log.WithError(err).Error("could not save head") } } diff --git a/beacon-chain/blockchain/service_norace_test.go b/beacon-chain/blockchain/service_norace_test.go index a88ab9ec8ccf..173d2acd6d09 100644 --- a/beacon-chain/blockchain/service_norace_test.go +++ b/beacon-chain/blockchain/service_norace_test.go @@ -23,9 +23,10 @@ func TestChainService_SaveHead_DataRace(t *testing.T) { cfg: &config{BeaconDB: beaconDB}, } b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) + st, _ := util.DeterministicGenesisState(t, 1) require.NoError(t, err) go func() { - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) }() - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) }