diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 973767ef6b2c..21754c1a4dac 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", + "//beacon-chain/db/kv:go_default_library", "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice/doubly-linked-tree:go_default_library", "//beacon-chain/forkchoice/protoarray:go_default_library", diff --git a/beacon-chain/blockchain/chain_info_norace_test.go b/beacon-chain/blockchain/chain_info_norace_test.go index 48de1808e096..a56367ba1194 100644 --- a/beacon-chain/blockchain/chain_info_norace_test.go +++ b/beacon-chain/blockchain/chain_info_norace_test.go @@ -19,10 +19,11 @@ func TestHeadSlot_DataRace(t *testing.T) { } b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) + st, _ := util.DeterministicGenesisState(t, 1) wait := make(chan struct{}) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) }() s.HeadSlot() <-wait @@ -37,9 +38,11 @@ func TestHeadRoot_DataRace(t *testing.T) { b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) wait := make(chan struct{}) + st, _ := util.DeterministicGenesisState(t, 1) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) + }() _, err = s.HeadRoot(context.Background()) require.NoError(t, err) @@ -57,9 +60,11 @@ func TestHeadBlock_DataRace(t *testing.T) { b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) wait := make(chan struct{}) + st, _ := util.DeterministicGenesisState(t, 1) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) + }() _, err = s.HeadBlock(context.Background()) require.NoError(t, err) @@ -74,9 +79,11 @@ func TestHeadState_DataRace(t *testing.T) { b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) wait := make(chan struct{}) + st, _ := util.DeterministicGenesisState(t, 1) go func() { defer close(wait) - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) + }() _, err = s.HeadState(context.Background()) require.NoError(t, err) diff --git a/beacon-chain/blockchain/head.go b/beacon-chain/blockchain/head.go index 5870c442b46e..44055f723528 100644 --- a/beacon-chain/blockchain/head.go +++ b/beacon-chain/blockchain/head.go @@ -44,7 +44,11 @@ func (s *Service) UpdateAndSaveHeadWithBalances(ctx context.Context) error { if err != nil { return err } - return s.saveHead(ctx, headRoot, headBlock) + headState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot) + if err != nil { + return errors.Wrap(err, "could not retrieve head state in DB") + } + return s.saveHead(ctx, headRoot, headBlock, headState) } // This defines the current chain service's view of head. @@ -101,7 +105,7 @@ func (s *Service) updateHead(ctx context.Context, balances []uint64) ([32]byte, // This saves head info to the local service cache, it also saves the // new head root to the DB. -func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock block.SignedBeaconBlock) error { +func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock block.SignedBeaconBlock, headState state.BeaconState) error { ctx, span := trace.StartSpan(ctx, "blockChain.saveHead") defer span.End() @@ -116,6 +120,9 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo if err := helpers.BeaconBlockIsNil(headBlock); err != nil { return err } + if headState == nil || headState.IsNil() { + return errors.New("cannot save nil head state") + } // If the head state is not available, just return nil. // There's nothing to cache @@ -123,15 +130,6 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo return nil } - // Get the new head state from cached state or DB. - newHeadState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot) - if err != nil { - return errors.Wrap(err, "could not retrieve head state in DB") - } - if newHeadState == nil || newHeadState.IsNil() { - return errors.New("cannot save nil head state") - } - // A chain re-org occurred, so we fire an event notifying the rest of the services. headSlot := s.HeadSlot() newHeadSlot := headBlock.Block().Slot() @@ -170,7 +168,7 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo } // Cache the new head info. - s.setHead(headRoot, headBlock, newHeadState) + s.setHead(headRoot, headBlock, headState) // Save the new head root to DB. if err := s.cfg.BeaconDB.SaveHeadBlockRoot(ctx, headRoot); err != nil { @@ -180,7 +178,7 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock blo // Forward an event capturing a new chain head over a common event feed // done in a goroutine to avoid blocking the critical runtime main routine. go func() { - if err := s.notifyNewHeadEvent(ctx, newHeadSlot, newHeadState, newStateRoot, headRoot[:]); err != nil { + if err := s.notifyNewHeadEvent(ctx, newHeadSlot, headState, newStateRoot, headRoot[:]); err != nil { log.WithError(err).Error("Could not notify event feed of new chain head") } }() diff --git a/beacon-chain/blockchain/head_test.go b/beacon-chain/blockchain/head_test.go index eee3031c7445..fb72596348b9 100644 --- a/beacon-chain/blockchain/head_test.go +++ b/beacon-chain/blockchain/head_test.go @@ -31,7 +31,8 @@ func TestSaveHead_Same(t *testing.T) { service.head = &head{slot: 0, root: r} b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) - require.NoError(t, service.saveHead(context.Background(), r, b)) + st, _ := util.DeterministicGenesisState(t, 1) + require.NoError(t, service.saveHead(context.Background(), r, b, st)) assert.Equal(t, types.Slot(0), service.headSlot(), "Head did not stay the same") assert.Equal(t, r, service.headRoot(), "Head did not stay the same") } @@ -69,7 +70,7 @@ func TestSaveHead_Different(t *testing.T) { require.NoError(t, headState.SetSlot(1)) require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), ðpb.StateSummary{Slot: 1, Root: newRoot[:]})) require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), headState, newRoot)) - require.NoError(t, service.saveHead(context.Background(), newRoot, wsb)) + require.NoError(t, service.saveHead(context.Background(), newRoot, wsb, headState)) assert.Equal(t, types.Slot(1), service.HeadSlot(), "Head did not change") @@ -115,7 +116,7 @@ func TestSaveHead_Different_Reorg(t *testing.T) { require.NoError(t, headState.SetSlot(1)) require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), ðpb.StateSummary{Slot: 1, Root: newRoot[:]})) require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), headState, newRoot)) - require.NoError(t, service.saveHead(context.Background(), newRoot, wsb)) + require.NoError(t, service.saveHead(context.Background(), newRoot, wsb, headState)) assert.Equal(t, types.Slot(1), service.HeadSlot(), "Head did not change") @@ -159,7 +160,8 @@ func TestUpdateHead_MissingJustifiedRoot(t *testing.T) { service.store.SetBestJustifiedCheckpt(ðpb.Checkpoint{}) headRoot, err := service.updateHead(context.Background(), []uint64{}) require.NoError(t, err) - require.NoError(t, service.saveHead(context.Background(), headRoot, wsb)) + st, _ := util.DeterministicGenesisState(t, 1) + require.NoError(t, service.saveHead(context.Background(), headRoot, wsb, st)) } func Test_notifyNewHeadEvent(t *testing.T) { diff --git a/beacon-chain/blockchain/optimistic_sync.go b/beacon-chain/blockchain/optimistic_sync.go index 20885e3081a5..53cf096b5684 100644 --- a/beacon-chain/blockchain/optimistic_sync.go +++ b/beacon-chain/blockchain/optimistic_sync.go @@ -5,14 +5,21 @@ import ( "fmt" "github.com/pkg/errors" + types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/core/time" + "github.com/prysmaticlabs/prysm/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" + "github.com/prysmaticlabs/prysm/beacon-chain/state" + fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/encoding/bytesutil" enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block" + "github.com/prysmaticlabs/prysm/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -20,7 +27,7 @@ import ( // notifyForkchoiceUpdate signals execution engine the fork choice updates. Execution engine should: // 1. Re-organizes the execution payload chain and corresponding state to make head_block_hash the head. // 2. Applies finality to the execution state: it irreversibly persists the chain of all execution payloads and corresponding state, up to and including finalized_block_hash. -func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.BeaconBlock, headRoot [32]byte, finalizedRoot [32]byte) (*enginev1.PayloadIDBytes, error) { +func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headState state.BeaconState, headBlk block.BeaconBlock, headRoot [32]byte, finalizedRoot [32]byte) (*enginev1.PayloadIDBytes, error) { ctx, span := trace.StartSpan(ctx, "blockChain.notifyForkchoiceUpdate") defer span.End() @@ -66,8 +73,13 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.Beac FinalizedBlockHash: finalizedHash, } - // payload attribute is only required when requesting payload, here we are just updating fork choice, so it is nil. - payloadID, _, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, nil /*payload attribute*/) + nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer. + hasAttr, attr, proposerId, err := s.getPayloadAttribute(ctx, headState, nextSlot) + if err != nil { + return nil, errors.Wrap(err, "could not get payload attribute") + } + + payloadID, _, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr) if err != nil { switch err { case powchain.ErrAcceptedSyncingPayloadStatus: @@ -84,6 +96,11 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, headBlk block.Beac if err := s.cfg.ForkChoiceStore.SetOptimisticToValid(ctx, headRoot); err != nil { return nil, errors.Wrap(err, "could not set block to valid") } + if hasAttr { // If the forkchoice update call has an attribute, update the proposer payload ID cache. + var pId [8]byte + copy(pId[:], payloadID[:]) + s.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(nextSlot, proposerId, pId) + } return payloadID, nil } @@ -188,6 +205,55 @@ func (s *Service) optimisticCandidateBlock(ctx context.Context, blk block.Beacon return parentIsExecutionBlock, nil } +// getPayloadAttributes returns the payload attributes for the given state and slot. +// The attribute is required to initiate a payload build process in the context of an `engine_forkchoiceUpdated` call. +func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, slot types.Slot) (bool, *enginev1.PayloadAttributes, types.ValidatorIndex, error) { + proposerID, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(slot) + if !ok { // There's no need to build attribute if there is no proposer for slot. + return false, nil, 0, nil + } + + // Get previous randao. + st = st.Copy() + st, err := transition.ProcessSlotsIfPossible(ctx, st, slot) + if err != nil { + return false, nil, 0, err + } + prevRando, err := helpers.RandaoMix(st, time.CurrentEpoch(st)) + if err != nil { + return false, nil, 0, nil + } + + // Get fee recipient. + feeRecipient := params.BeaconConfig().DefaultFeeRecipient + recipient, err := s.cfg.BeaconDB.FeeRecipientByValidatorID(ctx, proposerID) + switch { + case errors.Is(err, kv.ErrNotFoundFeeRecipient): + if feeRecipient.String() == fieldparams.EthBurnAddressHex { + logrus.WithFields(logrus.Fields{ + "validatorIndex": proposerID, + "burnAddress": fieldparams.EthBurnAddressHex, + }).Error("Fee recipient not set. Using burn address") + } + case err != nil: + return false, nil, 0, errors.Wrap(err, "could not get fee recipient in db") + default: + feeRecipient = recipient + } + + // Get timestamp. + t, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot) + if err != nil { + return false, nil, 0, err + } + attr := &enginev1.PayloadAttributes{ + Timestamp: uint64(t.Unix()), + PrevRandao: prevRando, + SuggestedFeeRecipient: feeRecipient.Bytes(), + } + return true, attr, proposerID, nil +} + // removeInvalidBlockAndState removes the invalid block and its corresponding state from the cache and DB. func (s *Service) removeInvalidBlockAndState(ctx context.Context, blkRoots [][32]byte) error { for _, root := range blkRoots { diff --git a/beacon-chain/blockchain/optimistic_sync_test.go b/beacon-chain/blockchain/optimistic_sync_test.go index c964bcf12e65..0d98a0f80ab8 100644 --- a/beacon-chain/blockchain/optimistic_sync_test.go +++ b/beacon-chain/blockchain/optimistic_sync_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" + types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray" @@ -24,6 +27,7 @@ import ( "github.com/prysmaticlabs/prysm/testing/require" "github.com/prysmaticlabs/prysm/testing/util" "github.com/prysmaticlabs/prysm/time/slots" + logTest "github.com/sirupsen/logrus/hooks/test" ) func Test_NotifyForkchoiceUpdate(t *testing.T) { @@ -44,8 +48,13 @@ func Test_NotifyForkchoiceUpdate(t *testing.T) { WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithForkChoiceStore(fcs), + WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), } service, err := NewService(ctx, opts...) + st, _ := util.DeterministicGenesisState(t, 1) + service.head = &head{ + state: st, + } require.NoError(t, err) require.NoError(t, fcs.InsertOptimisticBlock(ctx, 0, [32]byte{}, [32]byte{}, params.BeaconConfig().ZeroHash, 0, 0)) @@ -157,7 +166,8 @@ func Test_NotifyForkchoiceUpdate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { service.cfg.ExecutionEngineCaller = &mockPOW.EngineClient{ErrForkchoiceUpdated: tt.newForkchoiceErr} - _, err := service.notifyForkchoiceUpdate(ctx, tt.blk, service.headRoot(), tt.finalizedRoot) + st, _ := util.DeterministicGenesisState(t, 1) + _, err := service.notifyForkchoiceUpdate(ctx, st, tt.blk, service.headRoot(), tt.finalizedRoot) if tt.errString != "" { require.ErrorContains(t, tt.errString, err) } else { @@ -588,6 +598,47 @@ func Test_IsOptimisticShallowExecutionParent(t *testing.T) { require.Equal(t, true, candidate) } +func Test_GetPayloadAttribute(t *testing.T) { + ctx := context.Background() + beaconDB := testDB.SetupDB(t) + opts := []Option{ + WithDatabase(beaconDB), + WithStateGen(stategen.New(beaconDB)), + WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), + } + + // Cache miss + service, err := NewService(ctx, opts...) + require.NoError(t, err) + hasPayload, _, vId, err := service.getPayloadAttribute(ctx, nil, 0) + require.NoError(t, err) + require.Equal(t, false, hasPayload) + require.Equal(t, types.ValidatorIndex(0), vId) + + // Cache hit, advance state, no fee recipient + suggestedVid := types.ValidatorIndex(1) + slot := types.Slot(1) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}) + st, _ := util.DeterministicGenesisState(t, 1) + hook := logTest.NewGlobal() + hasPayload, attr, vId, err := service.getPayloadAttribute(ctx, st, slot) + require.NoError(t, err) + require.Equal(t, true, hasPayload) + require.Equal(t, suggestedVid, vId) + require.Equal(t, fieldparams.EthBurnAddressHex, common.BytesToAddress(attr.SuggestedFeeRecipient).String()) + require.LogsContain(t, hook, "Fee recipient not set. Using burn address") + + // Cache hit, advance state, has fee recipient + suggestedAddr := common.HexToAddress("123") + require.NoError(t, service.cfg.BeaconDB.SaveFeeRecipientsByValidatorIDs(ctx, []types.ValidatorIndex{suggestedVid}, []common.Address{suggestedAddr})) + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, suggestedVid, [8]byte{}) + hasPayload, attr, vId, err = service.getPayloadAttribute(ctx, st, slot) + require.NoError(t, err) + require.Equal(t, true, hasPayload) + require.Equal(t, suggestedVid, vId) + require.Equal(t, suggestedAddr, common.BytesToAddress(attr.SuggestedFeeRecipient)) +} + func Test_UpdateLastValidatedCheckpoint(t *testing.T) { params.SetupTestConfigCleanup(t) params.OverrideBeaconConfig(params.MainnetConfig()) diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index f0ffa13b2073..4496d5116571 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -2,6 +2,7 @@ package blockchain import ( "github.com/prysmaticlabs/prysm/async/event" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" @@ -66,6 +67,14 @@ func WithDepositCache(c *depositcache.DepositCache) Option { } } +// WithProposerIdsCache for proposer id cache. +func WithProposerIdsCache(c *cache.ProposerPayloadIDsCache) Option { + return func(s *Service) error { + s.cfg.ProposerSlotIndexCache = c + return nil + } +} + // WithAttestationPool for attestation lifecycle after chain inclusion. func WithAttestationPool(p attestations.Pool) Option { return func(s *Service) error { diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 508060ef0532..bff01f197716 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -212,10 +212,14 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b if err != nil { return err } - if _, err := s.notifyForkchoiceUpdate(ctx, headBlock.Block(), headRoot, bytesutil.ToBytes32(finalized.Root)); err != nil { + headState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot) + if err != nil { + return err + } + if _, err := s.notifyForkchoiceUpdate(ctx, headState, headBlock.Block(), headRoot, bytesutil.ToBytes32(finalized.Root)); err != nil { return err } - if err := s.saveHead(ctx, headRoot, headBlock); err != nil { + if err := s.saveHead(ctx, headRoot, headBlock, headState); err != nil { return errors.Wrap(err, "could not save head") } @@ -428,7 +432,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlo } } - if _, err := s.notifyForkchoiceUpdate(ctx, b.Block(), blockRoots[i], bytesutil.ToBytes32(fCheckpoints[i].Root)); err != nil { + if _, err := s.notifyForkchoiceUpdate(ctx, preState, b.Block(), blockRoots[i], bytesutil.ToBytes32(fCheckpoints[i].Root)); err != nil { return nil, nil, err } } diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 6b9fd0705540..430ad2ffdb9a 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -186,7 +186,13 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32 log.WithError(err).Error("Could not get block from db") return } + headState, err := s.cfg.StateGen.StateByRoot(ctx, newHeadRoot) + if err != nil { + log.WithError(err).Error("Could not get state from db") + return + } _, err = s.notifyForkchoiceUpdate(s.ctx, + headState, newHeadBlock.Block(), newHeadRoot, bytesutil.ToBytes32(finalized.Root), @@ -194,7 +200,7 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32 if err != nil { log.WithError(err).Error("could not notify forkchoice update") } - if err := s.saveHead(ctx, newHeadRoot, newHeadBlock); err != nil { + if err := s.saveHead(ctx, newHeadRoot, newHeadBlock, headState); err != nil { log.WithError(err).Error("could not save head") } } diff --git a/beacon-chain/blockchain/receive_attestation_test.go b/beacon-chain/blockchain/receive_attestation_test.go index 7b41ef6d016c..43c44d3c1996 100644 --- a/beacon-chain/blockchain/receive_attestation_test.go +++ b/beacon-chain/blockchain/receive_attestation_test.go @@ -6,6 +6,7 @@ import ( "time" types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/transition" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -133,7 +134,7 @@ func TestNotifyEngineIfChangedHead(t *testing.T) { service, err := NewService(ctx, opts...) require.NoError(t, err) - + service.cfg.ProposerSlotIndexCache = cache.NewProposerPayloadIDsCache() service.notifyEngineIfChangedHead(ctx, service.headRoot()) hookErr := "could not notify forkchoice update" finalizedErr := "could not get finalized checkpoint" @@ -156,9 +157,20 @@ func TestNotifyEngineIfChangedHead(t *testing.T) { r1, err := b.Block.HashTreeRoot() require.NoError(t, err) finalized := ðpb.Checkpoint{Root: r1[:], Epoch: 0} - + st, _ := util.DeterministicGenesisState(t, 1) + service.head = &head{ + slot: 1, + root: r1, + block: wsb, + state: st, + } + service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}) service.store.SetFinalizedCheckpt(finalized) service.notifyEngineIfChangedHead(ctx, r1) require.LogsDoNotContain(t, hook, finalizedErr) require.LogsDoNotContain(t, hook, hookErr) + vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2) + require.Equal(t, true, has) + require.Equal(t, types.ValidatorIndex(1), vId) + require.Equal(t, [8]byte{1}, payloadID) } diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 0689441e65c6..e2e3e64d588c 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -74,6 +74,7 @@ type config struct { ChainStartFetcher powchain.ChainStartFetcher BeaconDB db.HeadAccessDatabase DepositCache *depositcache.DepositCache + ProposerSlotIndexCache *cache.ProposerPayloadIDsCache AttPool attestations.Pool ExitPool voluntaryexits.PoolManager SlashingPool slashings.PoolManager diff --git a/beacon-chain/blockchain/service_norace_test.go b/beacon-chain/blockchain/service_norace_test.go index a88ab9ec8ccf..173d2acd6d09 100644 --- a/beacon-chain/blockchain/service_norace_test.go +++ b/beacon-chain/blockchain/service_norace_test.go @@ -23,9 +23,10 @@ func TestChainService_SaveHead_DataRace(t *testing.T) { cfg: &config{BeaconDB: beaconDB}, } b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock()) + st, _ := util.DeterministicGenesisState(t, 1) require.NoError(t, err) go func() { - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) }() - require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b)) + require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b, st)) } diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index b07500d5f255..4f74800ec65e 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "common.go", "doc.go", "error.go", + "payload_id.go", "proposer_indices.go", "proposer_indices_disabled.go", # keep "proposer_indices_type.go", @@ -26,6 +27,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache", visibility = [ "//beacon-chain:__subpackages__", + "//testing/spectest:__subpackages__", "//tools:__subpackages__", ], deps = [ @@ -61,6 +63,7 @@ go_test( "checkpoint_state_test.go", "committee_fuzz_test.go", "committee_test.go", + "payload_id_test.go", "proposer_indices_test.go", "skip_slot_cache_test.go", "subnet_ids_test.go", diff --git a/beacon-chain/cache/payload_id.go b/beacon-chain/cache/payload_id.go new file mode 100644 index 000000000000..1f410595e7b0 --- /dev/null +++ b/beacon-chain/cache/payload_id.go @@ -0,0 +1,73 @@ +package cache + +import ( + "sync" + + types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/encoding/bytesutil" +) + +const vIdLength = 8 +const pIdLength = 8 +const vpIdsLength = vIdLength + pIdLength + +// ProposerPayloadIDsCache is a cache of proposer payload IDs. +// The key is the slot. The value is the concatenation of the proposer and payload IDs. 8 bytes each. +type ProposerPayloadIDsCache struct { + slotToProposerAndPayloadIDs map[types.Slot][vpIdsLength]byte + sync.RWMutex +} + +// NewProposerPayloadIDsCache creates a new proposer payload IDs cache. +func NewProposerPayloadIDsCache() *ProposerPayloadIDsCache { + return &ProposerPayloadIDsCache{ + slotToProposerAndPayloadIDs: make(map[types.Slot][vpIdsLength]byte), + } +} + +// GetProposerPayloadIDs returns the proposer and payload IDs for the given slot. +func (f *ProposerPayloadIDsCache) GetProposerPayloadIDs(slot types.Slot) (types.ValidatorIndex, [8]byte, bool) { + f.RLock() + defer f.RUnlock() + ids, ok := f.slotToProposerAndPayloadIDs[slot] + if !ok { + return 0, [8]byte{}, false + } + vId := ids[:vIdLength] + + b := ids[vIdLength:] + var pId [pIdLength]byte + copy(pId[:], b) + + return types.ValidatorIndex(bytesutil.BytesToUint64BigEndian(vId)), pId, true +} + +// SetProposerAndPayloadIDs sets the proposer and payload IDs for the given slot. +func (f *ProposerPayloadIDsCache) SetProposerAndPayloadIDs(slot types.Slot, vId types.ValidatorIndex, pId [8]byte) { + f.Lock() + defer f.Unlock() + var vIdBytes [vIdLength]byte + copy(vIdBytes[:], bytesutil.Uint64ToBytesBigEndian(uint64(vId))) + + var bytes [vpIdsLength]byte + copy(bytes[:], append(vIdBytes[:], pId[:]...)) + + _, ok := f.slotToProposerAndPayloadIDs[slot] + // Ok to overwrite if the slot is already set but the payload ID is not set. + // This combats the re-org case where payload assignment could change the epoch of. + if !ok || (ok && pId != [pIdLength]byte{}) { + f.slotToProposerAndPayloadIDs[slot] = bytes + } +} + +// PrunePayloadIDs removes the payload id entries that's current than input slot. +func (f *ProposerPayloadIDsCache) PrunePayloadIDs(slot types.Slot) { + f.Lock() + defer f.Unlock() + + for s := range f.slotToProposerAndPayloadIDs { + if slot > s { + delete(f.slotToProposerAndPayloadIDs, s) + } + } +} diff --git a/beacon-chain/cache/payload_id_test.go b/beacon-chain/cache/payload_id_test.go new file mode 100644 index 000000000000..71573409e528 --- /dev/null +++ b/beacon-chain/cache/payload_id_test.go @@ -0,0 +1,60 @@ +package cache + +import ( + "testing" + + types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/testing/require" +) + +func TestValidatorPayloadIDsCache_GetAndSaveValidatorPayloadIDs(t *testing.T) { + cache := NewProposerPayloadIDsCache() + i, p, ok := cache.GetProposerPayloadIDs(0) + require.Equal(t, false, ok) + require.Equal(t, types.ValidatorIndex(0), i) + require.Equal(t, [pIdLength]byte{}, p) + + slot := types.Slot(1234) + vid := types.ValidatorIndex(34234324) + pid := [8]byte{1, 2, 3, 3, 7, 8, 7, 8} + cache.SetProposerAndPayloadIDs(slot, vid, pid) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, pid, p) + + slot = types.Slot(9456456) + vid = types.ValidatorIndex(6786745) + cache.SetProposerAndPayloadIDs(slot, vid, [pIdLength]byte{}) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, [pIdLength]byte{}, p) + + // reset cache without pid + slot = types.Slot(9456456) + vid = types.ValidatorIndex(11111) + pid = [8]byte{3, 2, 3, 33, 72, 8, 7, 8} + cache.SetProposerAndPayloadIDs(slot, vid, pid) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, pid, p) + + // reset cache with existing pid + slot = types.Slot(9456456) + vid = types.ValidatorIndex(11111) + newPid := [8]byte{1, 2, 3, 33, 72, 8, 7, 1} + cache.SetProposerAndPayloadIDs(slot, vid, newPid) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, true, ok) + require.Equal(t, vid, i) + require.Equal(t, newPid, p) + + // remove cache entry + cache.PrunePayloadIDs(slot + 1) + i, p, ok = cache.GetProposerPayloadIDs(slot) + require.Equal(t, false, ok) + require.Equal(t, types.ValidatorIndex(0), i) + require.Equal(t, [pIdLength]byte{}, p) +} diff --git a/beacon-chain/core/helpers/beacon_committee.go b/beacon-chain/core/helpers/beacon_committee.go index d77d0f5725b9..894d4ea80c75 100644 --- a/beacon-chain/core/helpers/beacon_committee.go +++ b/beacon-chain/core/helpers/beacon_committee.go @@ -175,9 +175,7 @@ func CommitteeAssignments( return nil, nil, err } proposerIndexToSlots := make(map[types.ValidatorIndex][]types.Slot, params.BeaconConfig().SlotsPerEpoch) - // Proposal epochs do not have a look ahead, so we skip them over here. - validProposalEpoch := epoch < nextEpoch - for slot := startSlot; slot < startSlot+params.BeaconConfig().SlotsPerEpoch && validProposalEpoch; slot++ { + for slot := startSlot; slot < startSlot+params.BeaconConfig().SlotsPerEpoch; slot++ { // Skip proposer assignment for genesis slot. if slot == 0 { continue @@ -192,6 +190,15 @@ func CommitteeAssignments( proposerIndexToSlots[i] = append(proposerIndexToSlots[i], slot) } + // If previous proposer indices computation is outside if current proposal epoch range, + // we need to reset state slot back to start slot so that we can compute the correct committees. + currentProposalEpoch := epoch < nextEpoch + if !currentProposalEpoch { + if err := state.SetSlot(state.Slot() - params.BeaconConfig().SlotsPerEpoch); err != nil { + return nil, nil, err + } + } + activeValidatorIndices, err := ActiveValidatorIndices(ctx, state, epoch) if err != nil { return nil, nil, err diff --git a/beacon-chain/core/helpers/beacon_committee_test.go b/beacon-chain/core/helpers/beacon_committee_test.go index a7bb6d286f69..14030c969fe9 100644 --- a/beacon-chain/core/helpers/beacon_committee_test.go +++ b/beacon-chain/core/helpers/beacon_committee_test.go @@ -232,7 +232,7 @@ func TestCommitteeAssignments_CannotRetrieveFuture(t *testing.T) { _, proposerIndxs, err = CommitteeAssignments(context.Background(), state, time.CurrentEpoch(state)+1) require.NoError(t, err) - require.Equal(t, 0, len(proposerIndxs), "wanted empty proposer index set") + require.NotEqual(t, 0, len(proposerIndxs), "wanted non-zero proposer index set") } func TestCommitteeAssignments_EverySlotHasMin1Proposer(t *testing.T) { diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index 54fedc626b8c..eece8797176e 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//api/gateway:go_default_library", "//async/event:go_default_library", "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/kv:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index bf5ec0969109..4e6ba1ef9a98 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -21,6 +21,7 @@ import ( apigateway "github.com/prysmaticlabs/prysm/api/gateway" "github.com/prysmaticlabs/prysm/async/event" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" @@ -94,6 +95,7 @@ type BeaconNode struct { slashingsPool slashings.PoolManager syncCommitteePool synccommittee.Pool depositCache *depositcache.DepositCache + proposerIdsCache *cache.ProposerPayloadIDsCache stateFeed *event.Feed blockFeed *event.Feed opFeed *event.Feed @@ -152,6 +154,7 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) { slasherBlockHeadersFeed: new(event.Feed), slasherAttestationsFeed: new(event.Feed), serviceFlagOpts: &serviceFlagOpts{}, + proposerIdsCache: cache.NewProposerPayloadIDsCache(), } for _, opt := range opts { @@ -585,6 +588,7 @@ func (b *BeaconNode) registerBlockchainService() error { blockchain.WithStateGen(b.stateGen), blockchain.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp), + blockchain.WithProposerIdsCache(b.proposerIdsCache), ) blockchainService, err := blockchain.NewService(b.ctx, opts...) if err != nil { @@ -801,6 +805,7 @@ func (b *BeaconNode) registerRPCService() error { StateGen: b.stateGen, EnableDebugRPCEndpoints: enableDebugRPCEndpoints, MaxMsgSize: maxMsgSize, + ProposerIdsCache: b.proposerIdsCache, ExecutionEngineCaller: web3Service, }) diff --git a/beacon-chain/rpc/eth/validator/validator_test.go b/beacon-chain/rpc/eth/validator/validator_test.go index c65ac8ae53fa..41d863adfb04 100644 --- a/beacon-chain/rpc/eth/validator/validator_test.go +++ b/beacon-chain/rpc/eth/validator/validator_test.go @@ -1024,19 +1024,20 @@ func TestProduceBlockV2(t *testing.T) { TotalDifficulty: "0x1", }, }, - TimeFetcher: &mockChain.ChainService{}, - HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - BlockReceiver: &mockChain.ChainService{}, - ChainStartFetcher: &mockPOW.POWChain{}, - Eth1InfoFetcher: &mockPOW.POWChain{}, - Eth1BlockFetcher: &mockPOW.POWChain{}, - MockEth1Votes: true, - AttPool: attestations.NewPool(), - SlashingsPool: slashings.NewPool(), - ExitPool: voluntaryexits.NewPool(), - StateGen: stategen.New(db), - SyncCommitteePool: synccommittee.NewStore(), + TimeFetcher: &mockChain.ChainService{}, + HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + BlockReceiver: &mockChain.ChainService{}, + ChainStartFetcher: &mockPOW.POWChain{}, + Eth1InfoFetcher: &mockPOW.POWChain{}, + Eth1BlockFetcher: &mockPOW.POWChain{}, + MockEth1Votes: true, + AttPool: attestations.NewPool(), + SlashingsPool: slashings.NewPool(), + ExitPool: voluntaryexits.NewPool(), + StateGen: stategen.New(db), + SyncCommitteePool: synccommittee.NewStore(), + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } proposerSlashings := make([]*ethpbalpha.ProposerSlashing, params.BeaconConfig().MaxProposerSlashings) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel b/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel index a323f86b6c32..0cfe1e528b43 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/BUILD.bazel @@ -79,6 +79,8 @@ go_library( "@com_github_ferranbt_fastssz//:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prysmaticlabs_eth2_types//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go index 228ffef8d0bc..a37310e90cc5 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments.go @@ -138,7 +138,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. return nil, status.Errorf(codes.Internal, "Could not compute committee assignments: %v", err) } // Query the next epoch assignments for committee subnet subscriptions. - nextCommitteeAssignments, _, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1) + nextCommitteeAssignments, nextProposerIndexToSlots, err := helpers.CommitteeAssignments(ctx, s, req.Epoch+1) if err != nil { return nil, status.Errorf(codes.Internal, "Could not compute next committee assignments: %v", err) } @@ -180,6 +180,16 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb. nextAssignment.AttesterSlot = ca.AttesterSlot nextAssignment.CommitteeIndex = ca.CommitteeIndex } + // Cache proposer assignment for the current epoch. + for _, slot := range proposerIndexToSlots[idx] { + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, [8]byte{} /* payloadID */) + } + // Cache proposer assignment for the next epoch. + for _, slot := range nextProposerIndexToSlots[idx] { + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(slot, idx, [8]byte{} /* payloadID */) + } + // Prune payload ID cache for any slots before request slot. + vs.ProposerSlotIndexCache.PrunePayloadIDs(epochStartSlot) } else { // If the validator isn't in the beacon state, try finding their deposit to determine their status. vStatus, _ := vs.validatorStatus(ctx, s, pubKey) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go index b6ecb54d205c..f575ea6e879b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/assignments_test.go @@ -60,9 +60,10 @@ func TestGetDuties_OK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -144,10 +145,11 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockPOW.POWChain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockPOW.POWChain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -181,12 +183,12 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { res, err = vs.GetDuties(context.Background(), req) require.NoError(t, err, "Could not call epoch committee assignment") for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.Equal(t, types.ValidatorIndex(i), res.CurrentEpochDuties[i].ValidatorIndex) + require.Equal(t, types.ValidatorIndex(i), res.CurrentEpochDuties[i].ValidatorIndex) } for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.Equal(t, true, res.CurrentEpochDuties[i].IsSyncCommittee) + require.Equal(t, true, res.CurrentEpochDuties[i].IsSyncCommittee) // Current epoch and next epoch duties should be equal before the sync period epoch boundary. - assert.Equal(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) + require.Equal(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) } // Current epoch and next epoch duties should not be equal at the sync period epoch boundary. @@ -197,7 +199,7 @@ func TestGetAltairDuties_SyncCommitteeOK(t *testing.T) { res, err = vs.GetDuties(context.Background(), req) require.NoError(t, err, "Could not call epoch committee assignment") for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) + require.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) } } @@ -249,10 +251,11 @@ func TestGetBellatrixDuties_SyncCommitteeOK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now().Add(time.Duration(-1*int64(slot-1)) * time.Second), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockPOW.POWChain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockPOW.POWChain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -302,7 +305,7 @@ func TestGetBellatrixDuties_SyncCommitteeOK(t *testing.T) { res, err = vs.GetDuties(context.Background(), req) require.NoError(t, err, "Could not call epoch committee assignment") for i := 0; i < len(res.CurrentEpochDuties); i++ { - assert.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) + require.NotEqual(t, res.CurrentEpochDuties[i].IsSyncCommittee, res.NextEpochDuties[i].IsSyncCommittee) } } @@ -340,11 +343,12 @@ func TestGetAltairDuties_UnknownPubkey(t *testing.T) { require.NoError(t, err) vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - Eth1InfoFetcher: &mockPOW.POWChain{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - DepositFetcher: depositCache, + HeadFetcher: chain, + TimeFetcher: chain, + Eth1InfoFetcher: &mockPOW.POWChain{}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + DepositFetcher: depositCache, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } unknownPubkey := bytesutil.PadTo([]byte{'u'}, 48) @@ -399,9 +403,10 @@ func TestGetDuties_CurrentEpoch_ShouldNotFail(t *testing.T) { State: bState, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -437,9 +442,10 @@ func TestGetDuties_MultipleKeys_OK(t *testing.T) { State: bs, Root: genesisRoot[:], Genesis: time.Now(), } vs := &Server{ - HeadFetcher: chain, - TimeFetcher: chain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, + HeadFetcher: chain, + TimeFetcher: chain, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } pubkey0 := deposits[0].Data.PublicKey @@ -503,11 +509,12 @@ func TestStreamDuties_OK(t *testing.T) { Genesis: time.Now(), } vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, + Ctx: ctx, + HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + TimeFetcher: c, + StateNotifier: &mockChain.MockStateNotifier{}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. @@ -560,11 +567,12 @@ func TestStreamDuties_OK_ChainReorg(t *testing.T) { Genesis: time.Now(), } vs := &Server{ - Ctx: ctx, - HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - TimeFetcher: c, - StateNotifier: &mockChain.MockStateNotifier{}, + Ctx: ctx, + HeadFetcher: &mockChain.ChainService{State: bs, Root: genesisRoot[:]}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + TimeFetcher: c, + StateNotifier: &mockChain.MockStateNotifier{}, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } // Test the first validator in registry. diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go index e88ee7ea7336..684be5286083 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go @@ -9,6 +9,8 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/holiman/uint256" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -24,9 +26,31 @@ import ( "github.com/sirupsen/logrus" ) +var ( + // payloadIDCacheMiss tracks the number of payload ID requests that aren't present in the cache. + payloadIDCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ + Name: "payload_id_cache_miss", + Help: "The number of payload id get requests that aren't present in the cache.", + }) + // payloadIDCacheHit tracks the number of payload ID requests that are present in the cache. + payloadIDCacheHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "payload_id_cache_hit", + Help: "The number of payload id get requests that are present in the cache.", + }) +) + // This returns the execution payload of a given slot. The function has full awareness of pre and post merge. // The payload is computed given the respected time of merge. func (vs *Server) getExecutionPayload(ctx context.Context, slot types.Slot, vIdx types.ValidatorIndex) (*enginev1.ExecutionPayload, error) { + proposerID, payloadId, ok := vs.ProposerSlotIndexCache.GetProposerPayloadIDs(slot) + if ok && proposerID == vIdx && payloadId != [8]byte{} { // Payload ID is cache hit. Return the cached payload ID. + var pid [8]byte + copy(pid[:], payloadId[:]) + payloadIDCacheHit.Inc() + return vs.ExecutionEngineCaller.GetPayload(ctx, pid) + } + payloadIDCacheMiss.Inc() + st, err := vs.HeadFetcher.HeadState(ctx) if err != nil { return nil, err diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go index 90f26f8afd05..f849df1cfd7b 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload_test.go @@ -9,6 +9,7 @@ import ( "github.com/holiman/uint256" types "github.com/prysmaticlabs/eth2-types" chainMock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" powtesting "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/state" @@ -106,6 +107,11 @@ func TestServer_getExecutionPayload(t *testing.T) { payloadID: &pb.PayloadIDBytes{0x1}, validatorIndx: 1, }, + { + name: "transition completed, happy case, payload ID cached)", + st: transitionSt, + validatorIndx: 100, + }, { name: "transition completed, could not prepare payload", st: transitionSt, @@ -132,10 +138,12 @@ func TestServer_getExecutionPayload(t *testing.T) { params.OverrideBeaconConfig(cfg) vs := &Server{ - ExecutionEngineCaller: &powtesting.EngineClient{PayloadIDBytes: tt.payloadID, ErrForkchoiceUpdated: tt.forkchoiceErr}, - HeadFetcher: &chainMock.ChainService{State: tt.st}, - BeaconDB: beaconDB, + ExecutionEngineCaller: &powtesting.EngineClient{PayloadIDBytes: tt.payloadID, ErrForkchoiceUpdated: tt.forkchoiceErr}, + HeadFetcher: &chainMock.ChainService{State: tt.st}, + BeaconDB: beaconDB, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } + vs.ProposerSlotIndexCache.SetProposerAndPayloadIDs(tt.st.Slot(), 100, [8]byte{100}) _, err := vs.getExecutionPayload(context.Background(), tt.st.Slot(), tt.validatorIndx) if tt.errString != "" { require.ErrorContains(t, tt.errString, err) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index 66d8c7b86bad..72f4ec671b44 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -10,6 +10,7 @@ import ( types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/go-bitfield" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -2344,7 +2345,8 @@ func TestProposer_GetBeaconBlock_BellatrixEpoch(t *testing.T) { PayloadIDBytes: &enginev1.PayloadIDBytes{1}, ExecutionPayload: payload, }, - BeaconDB: db, + BeaconDB: db, + ProposerSlotIndexCache: cache.NewProposerPayloadIDsCache(), } randaoReveal, err := util.RandaoReveal(beaconState, 0, privKeys) diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go index 5678d98ffe71..65af19832576 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/server.go @@ -41,6 +41,7 @@ import ( type Server struct { Ctx context.Context AttestationCache *cache.AttestationCache + ProposerSlotIndexCache *cache.ProposerPayloadIDsCache HeadFetcher blockchain.HeadFetcher ForkFetcher blockchain.ForkFetcher FinalizationFetcher blockchain.FinalizationFetcher diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 78668fa2dc5d..6f7697d72f56 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -109,6 +109,7 @@ type Config struct { StateGen *stategen.State MaxMsgSize int ExecutionEngineCaller powchain.EngineCaller + ProposerIdsCache *cache.ProposerPayloadIDsCache } // NewService instantiates a new RPC service instance that will @@ -207,6 +208,7 @@ func (s *Service) Start() { ReplayerBuilder: ch, ExecutionEngineCaller: s.cfg.ExecutionEngineCaller, BeaconDB: s.cfg.BeaconDB, + ProposerSlotIndexCache: s.cfg.ProposerIdsCache, } validatorServerV1 := &validator.Server{ HeadFetcher: s.cfg.HeadFetcher, diff --git a/testing/spectest/shared/common/forkchoice/BUILD.bazel b/testing/spectest/shared/common/forkchoice/BUILD.bazel index 5aecfec048f7..07e5d574cb3e 100644 --- a/testing/spectest/shared/common/forkchoice/BUILD.bazel +++ b/testing/spectest/shared/common/forkchoice/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//beacon-chain/blockchain:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/db/testing:go_default_library", diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index 8e5b10167020..f246fb0a54ee 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" coreTime "github.com/prysmaticlabs/prysm/beacon-chain/core/time" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -57,6 +58,7 @@ func startChainService(t *testing.T, st state.BeaconState, block block.SignedBea blockchain.WithStateNotifier(&mock.MockStateNotifier{}), blockchain.WithAttestationPool(attestations.NewPool()), blockchain.WithDepositCache(depositCache), + blockchain.WithProposerIdsCache(cache.NewProposerPayloadIDsCache()), ) service, err := blockchain.NewService(context.Background(), opts...) require.NoError(t, err)