Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:prysmaticlabs/prysm into payload…
Browse files Browse the repository at this point in the history
…id-cache
  • Loading branch information
terencechain committed Apr 6, 2022
2 parents 00d782b + 83a8327 commit c8b2d46
Show file tree
Hide file tree
Showing 30 changed files with 527 additions and 952 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (s *Service) IsOptimistic(ctx context.Context) (bool, error) {
// IsOptimisticForRoot takes the root and slot as arguments instead of the current head
// and returns true if it is optimistic.
func (s *Service) IsOptimisticForRoot(ctx context.Context, root [32]byte) (bool, error) {
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(ctx, root)
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(root)
if err == nil {
return optimistic, nil
}
Expand Down
21 changes: 15 additions & 6 deletions beacon-chain/blockchain/chain_info_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import (
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
)

func TestHeadSlot_DataRace(t *testing.T) {
beaconDB := testDB.SetupDB(t)
s := &Service{
cfg: &config{BeaconDB: beaconDB},
}
b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
wait := make(chan struct{})
go func() {
defer close(wait)
require.NoError(t, s.saveHead(context.Background(), [32]byte{}))
require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b))
}()
s.HeadSlot()
<-wait
Expand All @@ -31,12 +34,14 @@ func TestHeadRoot_DataRace(t *testing.T) {
cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)},
head: &head{root: [32]byte{'A'}},
}
b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
wait := make(chan struct{})
go func() {
defer close(wait)
require.NoError(t, s.saveHead(context.Background(), [32]byte{}))
require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b))
}()
_, err := s.HeadRoot(context.Background())
_, err = s.HeadRoot(context.Background())
require.NoError(t, err)
<-wait
}
Expand All @@ -49,10 +54,12 @@ func TestHeadBlock_DataRace(t *testing.T) {
cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)},
head: &head{block: wsb},
}
b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
wait := make(chan struct{})
go func() {
defer close(wait)
require.NoError(t, s.saveHead(context.Background(), [32]byte{}))
require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b))
}()
_, err = s.HeadBlock(context.Background())
require.NoError(t, err)
Expand All @@ -64,12 +71,14 @@ func TestHeadState_DataRace(t *testing.T) {
s := &Service{
cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)},
}
b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
wait := make(chan struct{})
go func() {
defer close(wait)
require.NoError(t, s.saveHead(context.Background(), [32]byte{}))
require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b))
}()
_, err := s.HeadState(context.Background())
_, err = s.HeadState(context.Background())
require.NoError(t, err)
<-wait
}
28 changes: 13 additions & 15 deletions beacon-chain/blockchain/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ func (s *Service) UpdateAndSaveHeadWithBalances(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "could not update head")
}
return s.saveHead(ctx, headRoot)
headBlock, err := s.cfg.BeaconDB.Block(ctx, headRoot)
if err != nil {
return err
}
return s.saveHead(ctx, headRoot, headBlock)
}

// This defines the current chain service's view of head.
Expand Down Expand Up @@ -97,7 +101,7 @@ func (s *Service) updateHead(ctx context.Context, balances []uint64) ([32]byte,

// This saves head info to the local service cache, it also saves the
// new head root to the DB.
func (s *Service) saveHead(ctx context.Context, headRoot [32]byte) error {
func (s *Service) saveHead(ctx context.Context, headRoot [32]byte, headBlock block.SignedBeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "blockChain.saveHead")
defer span.End()

Expand All @@ -109,22 +113,16 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte) error {
if headRoot == bytesutil.ToBytes32(r) {
return nil
}
if err := helpers.BeaconBlockIsNil(headBlock); err != nil {
return err
}

// If the head state is not available, just return nil.
// There's nothing to cache
if !s.cfg.BeaconDB.HasStateSummary(ctx, headRoot) {
return nil
}

// Get the new head block from DB.
newHeadBlock, err := s.cfg.BeaconDB.Block(ctx, headRoot)
if err != nil {
return err
}
if err := helpers.BeaconBlockIsNil(newHeadBlock); err != nil {
return err
}

// Get the new head state from cached state or DB.
newHeadState, err := s.cfg.StateGen.StateByRoot(ctx, headRoot)
if err != nil {
Expand All @@ -136,11 +134,11 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte) error {

// A chain re-org occurred, so we fire an event notifying the rest of the services.
headSlot := s.HeadSlot()
newHeadSlot := newHeadBlock.Block().Slot()
newHeadSlot := headBlock.Block().Slot()
oldHeadRoot := s.headRoot()
oldStateRoot := s.headBlock().Block().StateRoot()
newStateRoot := newHeadBlock.Block().StateRoot()
if bytesutil.ToBytes32(newHeadBlock.Block().ParentRoot()) != bytesutil.ToBytes32(r) {
newStateRoot := headBlock.Block().StateRoot()
if bytesutil.ToBytes32(headBlock.Block().ParentRoot()) != bytesutil.ToBytes32(r) {
log.WithFields(logrus.Fields{
"newSlot": fmt.Sprintf("%d", newHeadSlot),
"oldSlot": fmt.Sprintf("%d", headSlot),
Expand Down Expand Up @@ -172,7 +170,7 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte) error {
}

// Cache the new head info.
s.setHead(headRoot, newHeadBlock, newHeadState)
s.setHead(headRoot, headBlock, newHeadState)

// Save the new head root to DB.
if err := s.cfg.BeaconDB.SaveHeadBlockRoot(ctx, headRoot); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions beacon-chain/blockchain/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func TestSaveHead_Same(t *testing.T) {

r := [32]byte{'A'}
service.head = &head{slot: 0, root: r}

require.NoError(t, service.saveHead(context.Background(), r))
b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, service.saveHead(context.Background(), r, b))
assert.Equal(t, types.Slot(0), service.headSlot(), "Head did not stay the same")
assert.Equal(t, r, service.headRoot(), "Head did not stay the same")
}
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestSaveHead_Different(t *testing.T) {
require.NoError(t, headState.SetSlot(1))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1, Root: newRoot[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), headState, newRoot))
require.NoError(t, service.saveHead(context.Background(), newRoot))
require.NoError(t, service.saveHead(context.Background(), newRoot, wsb))

assert.Equal(t, types.Slot(1), service.HeadSlot(), "Head did not change")

Expand Down Expand Up @@ -114,7 +115,7 @@ func TestSaveHead_Different_Reorg(t *testing.T) {
require.NoError(t, headState.SetSlot(1))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(context.Background(), &ethpb.StateSummary{Slot: 1, Root: newRoot[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveState(context.Background(), headState, newRoot))
require.NoError(t, service.saveHead(context.Background(), newRoot))
require.NoError(t, service.saveHead(context.Background(), newRoot, wsb))

assert.Equal(t, types.Slot(1), service.HeadSlot(), "Head did not change")

Expand Down Expand Up @@ -158,7 +159,7 @@ func TestUpdateHead_MissingJustifiedRoot(t *testing.T) {
service.store.SetBestJustifiedCheckpt(&ethpb.Checkpoint{})
headRoot, err := service.updateHead(context.Background(), []uint64{})
require.NoError(t, err)
require.NoError(t, service.saveHead(context.Background(), headRoot))
require.NoError(t, service.saveHead(context.Background(), headRoot, wsb))
}

func Test_notifyNewHeadEvent(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,14 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
if err != nil {
log.WithError(err).Warn("Could not update head")
}
if _, err := s.notifyForkchoiceUpdate(ctx, s.headBlock().Block(), s.headRoot(), bytesutil.ToBytes32(finalized.Root)); err != nil {
headBlock, err := s.cfg.BeaconDB.Block(ctx, headRoot)
if err != nil {
return err
}
if _, err := s.notifyForkchoiceUpdate(ctx, headBlock.Block(), headRoot, bytesutil.ToBytes32(finalized.Root)); err != nil {
return err
}
if err := s.saveHead(ctx, headRoot); err != nil {
if err := s.saveHead(ctx, headRoot, headBlock); err != nil {
return errors.Wrap(err, "could not save head")
}

Expand Down Expand Up @@ -258,7 +262,7 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
if err := s.cfg.ForkChoiceStore.Prune(ctx, fRoot); err != nil {
return errors.Wrap(err, "could not prune proto array fork choice nodes")
}
isOptimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(ctx, fRoot)
isOptimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(fRoot)
if err != nil {
return errors.Wrap(err, "could not check if node is optimistically synced")
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/process_block_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) err
}

fRoot := bytesutil.ToBytes32(cp.Root)
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(ctx, fRoot)
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(fRoot)
if err != nil {
return err
}
Expand Down
13 changes: 9 additions & 4 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,20 @@ func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32
log.WithError(errNilFinalizedInStore).Error("could not get finalized checkpoint")
return
}
_, err := s.notifyForkchoiceUpdate(s.ctx,
s.headBlock().Block(),
s.headRoot(),
newHeadBlock, err := s.cfg.BeaconDB.Block(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get block from db")
return
}
_, err = s.notifyForkchoiceUpdate(s.ctx,
newHeadBlock.Block(),
newHeadRoot,
bytesutil.ToBytes32(finalized.Root),
)
if err != nil {
log.WithError(err).Error("could not notify forkchoice update")
}
if err := s.saveHead(ctx, newHeadRoot); err != nil {
if err := s.saveHead(ctx, newHeadRoot, newHeadBlock); err != nil {
log.WithError(err).Error("could not save head")
}
}
Expand Down
15 changes: 10 additions & 5 deletions beacon-chain/blockchain/receive_attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,29 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
require.LogsContain(t, hook, finalizedErr)

hook.Reset()
service.head = &head{
root: [32]byte{'a'},
block: nil, /* should not panic if notify head uses correct head */
}

b := util.NewBeaconBlock()
b.Block.Slot = 1
b.Block.Slot = 2
wsb, err := wrapper.WrappedSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb))
r, err := b.Block.HashTreeRoot()
r1, err := b.Block.HashTreeRoot()
require.NoError(t, err)
finalized := &ethpb.Checkpoint{Root: r[:], Epoch: 0}
finalized := &ethpb.Checkpoint{Root: r1[:], Epoch: 0}
st, _ := util.DeterministicGenesisState(t, 1)
service.head = &head{
slot: 1,
root: r,
root: r1,
block: wsb,
state: st,
}
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1})
service.store.SetFinalizedCheckpt(finalized)
service.notifyEngineIfChangedHead(ctx, [32]byte{'b'})
service.notifyEngineIfChangedHead(ctx, r1)
require.LogsDoNotContain(t, hook, finalizedErr)
require.LogsDoNotContain(t, hook, hookErr)
vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2)
Expand Down
8 changes: 6 additions & 2 deletions beacon-chain/blockchain/service_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"testing"

testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
"github.com/sirupsen/logrus"
)

Expand All @@ -20,8 +22,10 @@ func TestChainService_SaveHead_DataRace(t *testing.T) {
s := &Service{
cfg: &config{BeaconDB: beaconDB},
}
b, err := wrapper.WrappedSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
go func() {
require.NoError(t, s.saveHead(context.Background(), [32]byte{}))
require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b))
}()
require.NoError(t, s.saveHead(context.Background(), [32]byte{}))
require.NoError(t, s.saveHead(context.Background(), [32]byte{}, b))
}
1 change: 1 addition & 0 deletions beacon-chain/forkchoice/doubly-linked-tree/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ var errInvalidProposerBoostRoot = errors.New("invalid proposer boost root")
var errUnknownFinalizedRoot = errors.New("unknown finalized root")
var errUnknownJustifiedRoot = errors.New("unknown justified root")
var errInvalidOptimisticStatus = errors.New("invalid optimistic status")
var errUnknownPayloadHash = errors.New("unknown payload hash")
7 changes: 4 additions & 3 deletions beacon-chain/forkchoice/doubly-linked-tree/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func New(justifiedEpoch, finalizedEpoch types.Epoch) *ForkChoice {
finalizedEpoch: finalizedEpoch,
proposerBoostRoot: [32]byte{},
nodeByRoot: make(map[[fieldparams.RootLength]byte]*Node),
nodeByPayload: make(map[[fieldparams.RootLength]byte]*Node),
pruneThreshold: defaultPruneThreshold,
}

Expand Down Expand Up @@ -168,7 +169,7 @@ func (f *ForkChoice) IsCanonical(root [32]byte) bool {
}

// IsOptimistic returns true if the given root has been optimistically synced.
func (f *ForkChoice) IsOptimistic(_ context.Context, root [32]byte) (bool, error) {
func (f *ForkChoice) IsOptimistic(root [32]byte) (bool, error) {
f.store.nodesLock.RLock()
defer f.store.nodesLock.RUnlock()

Expand Down Expand Up @@ -302,6 +303,6 @@ func (f *ForkChoice) ForkChoiceNodes() []*pbrpc.ForkChoiceNode {
}

// SetOptimisticToInvalid removes a block with an invalid execution payload from fork choice store
func (f *ForkChoice) SetOptimisticToInvalid(ctx context.Context, root [fieldparams.RootLength]byte) ([][32]byte, error) {
return f.store.removeNode(ctx, root)
func (f *ForkChoice) SetOptimisticToInvalid(ctx context.Context, root, payloadHash [fieldparams.RootLength]byte) ([][32]byte, error) {
return f.store.setOptimisticToInvalid(ctx, root, payloadHash)
}
2 changes: 1 addition & 1 deletion beacon-chain/forkchoice/doubly-linked-tree/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (n *Node) leadsToViableHead(justifiedEpoch, finalizedEpoch types.Epoch) boo
return n.bestDescendant.viableForHead(justifiedEpoch, finalizedEpoch)
}

// setNodeAndParentValidated sets the current node and the parent as validated (i.e. non-optimistic).
// setNodeAndParentValidated sets the current node and all the ancestors as validated (i.e. non-optimistic).
func (n *Node) setNodeAndParentValidated(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/forkchoice/doubly-linked-tree/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,27 +180,27 @@ func TestNode_SetFullyValidated(t *testing.T) {
require.NoError(t, f.InsertOptimisticBlock(ctx, 4, indexToHash(4), indexToHash(3), params.BeaconConfig().ZeroHash, 1, 1))
require.NoError(t, f.InsertOptimisticBlock(ctx, 5, indexToHash(5), indexToHash(1), params.BeaconConfig().ZeroHash, 1, 1))

opt, err := f.IsOptimistic(ctx, indexToHash(5))
opt, err := f.IsOptimistic(indexToHash(5))
require.NoError(t, err)
require.Equal(t, true, opt)

opt, err = f.IsOptimistic(ctx, indexToHash(4))
opt, err = f.IsOptimistic(indexToHash(4))
require.NoError(t, err)
require.Equal(t, true, opt)

require.NoError(t, f.store.nodeByRoot[indexToHash(4)].setNodeAndParentValidated(ctx))

// block 5 should still be optimistic
opt, err = f.IsOptimistic(ctx, indexToHash(5))
opt, err = f.IsOptimistic(indexToHash(5))
require.NoError(t, err)
require.Equal(t, true, opt)

// block 4 and 3 should now be valid
opt, err = f.IsOptimistic(ctx, indexToHash(4))
opt, err = f.IsOptimistic(indexToHash(4))
require.NoError(t, err)
require.Equal(t, false, opt)

opt, err = f.IsOptimistic(ctx, indexToHash(3))
opt, err = f.IsOptimistic(indexToHash(3))
require.NoError(t, err)
require.Equal(t, false, opt)
}
Loading

0 comments on commit c8b2d46

Please sign in to comment.