Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beacon/light: request finality update explicitly when necessary #29567

Merged
merged 5 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions beacon/blsync/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type beaconBlockSync struct {

type headTracker interface {
PrefetchHead() types.HeadInfo
ValidatedHead() (types.SignedHeader, bool)
ValidatedOptimistic() (types.OptimisticUpdate, bool)
ValidatedFinality() (types.FinalityUpdate, bool)
}

Expand All @@ -66,6 +66,7 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request.
case request.EvResponse, request.EvFail, request.EvTimeout:
sid, req, resp := event.RequestInfo()
blockRoot := common.Hash(req.(sync.ReqBeaconBlock))
log.Debug("Beacon block event", "type", event.Type.Name, "hash", blockRoot)
if resp != nil {
s.recentBlocks.Add(blockRoot, resp.(*types.BeaconBlock))
}
Expand All @@ -80,8 +81,8 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request.
}
s.updateEventFeed()
// request validated head block if unavailable and not yet requested
if vh, ok := s.headTracker.ValidatedHead(); ok {
s.tryRequestBlock(requester, vh.Header.Hash(), false)
if vh, ok := s.headTracker.ValidatedOptimistic(); ok {
s.tryRequestBlock(requester, vh.Attested.Hash(), false)
}
// request prefetch head if the given server has announced it
if prefetchHead := s.headTracker.PrefetchHead().BlockRoot; prefetchHead != (common.Hash{}) {
Expand Down Expand Up @@ -114,31 +115,30 @@ func blockHeadInfo(block *types.BeaconBlock) types.HeadInfo {
}

func (s *beaconBlockSync) updateEventFeed() {
head, ok := s.headTracker.ValidatedHead()
optimistic, ok := s.headTracker.ValidatedOptimistic()
if !ok {
return
}

validatedHead := head.Header.Hash()
validatedHead := optimistic.Attested.Hash()
headBlock, ok := s.recentBlocks.Get(validatedHead)
if !ok {
return
}

var finalizedHash common.Hash
if finality, ok := s.headTracker.ValidatedFinality(); ok {
he := head.Header.Epoch()
he := optimistic.Attested.Epoch()
fe := finality.Attested.Header.Epoch()
switch {
case he == fe:
finalizedHash = finality.Finalized.PayloadHeader.BlockHash()
case he < fe:
return
case he == fe+1:
parent, ok := s.recentBlocks.Get(head.Header.ParentRoot)
parent, ok := s.recentBlocks.Get(optimistic.Attested.ParentRoot)
if !ok || parent.Slot()/params.EpochLength == fe {
return // head is at first slot of next epoch, wait for finality update
//TODO: try to fetch finality update directly if subscription does not deliver
}
}
}
Expand All @@ -156,7 +156,7 @@ func (s *beaconBlockSync) updateEventFeed() {
return
}
s.chainHeadFeed.Send(types.ChainHeadEvent{
BeaconHead: head.Header,
BeaconHead: optimistic.Attested.Header,
Block: execBlock,
Finalized: finalizedHash,
})
Expand Down
8 changes: 6 additions & 2 deletions beacon/blsync/block_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,12 @@ func (h *testHeadTracker) PrefetchHead() types.HeadInfo {
return h.prefetch
}

func (h *testHeadTracker) ValidatedHead() (types.SignedHeader, bool) {
return h.validated, h.validated.Header != (types.Header{})
func (h *testHeadTracker) ValidatedOptimistic() (types.OptimisticUpdate, bool) {
return types.OptimisticUpdate{
Attested: types.HeaderWithExecProof{Header: h.validated.Header},
Signature: h.validated.Signature,
SignatureSlot: h.validated.SignatureSlot,
}, h.validated.Header != (types.Header{})
}

// TODO add test case for finality
Expand Down
3 changes: 3 additions & 0 deletions beacon/blsync/engineclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
for {
select {
case <-ec.rootCtx.Done():
log.Debug("Stopping engine API update loop")
return

case event := <-headCh:
Expand All @@ -73,12 +74,14 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
fork := ec.config.ForkAtEpoch(event.BeaconHead.Epoch())
forkName := strings.ToLower(fork.Name)

log.Debug("Calling NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash())
if status, err := ec.callNewPayload(forkName, event); err == nil {
log.Info("Successful NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "status", status)
} else {
log.Error("Failed NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "error", err)
}

log.Debug("Calling ForkchoiceUpdated", "head", event.Block.Hash())
if status, err := ec.callForkchoiceUpdated(forkName, event); err == nil {
log.Info("Successful ForkchoiceUpdated", "head", event.Block.Hash(), "status", status)
} else {
Expand Down
16 changes: 10 additions & 6 deletions beacon/light/api/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func (s *ApiServer) Subscribe(eventCallback func(event request.Event)) {
log.Debug("New head received", "slot", slot, "blockRoot", blockRoot)
eventCallback(request.Event{Type: sync.EvNewHead, Data: types.HeadInfo{Slot: slot, BlockRoot: blockRoot}})
},
OnSignedHead: func(head types.SignedHeader) {
log.Debug("New signed head received", "slot", head.Header.Slot, "blockRoot", head.Header.Hash(), "signerCount", head.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewSignedHead, Data: head})
OnOptimistic: func(update types.OptimisticUpdate) {
log.Debug("New optimistic update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewOptimisticUpdate, Data: update})
},
OnFinality: func(head types.FinalityUpdate) {
log.Debug("New finality update received", "slot", head.Attested.Slot, "blockRoot", head.Attested.Hash(), "signerCount", head.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: head})
OnFinality: func(update types.FinalityUpdate) {
log.Debug("New finality update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount())
eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: update})
},
OnError: func(err error) {
log.Warn("Head event stream error", "err", err)
Expand Down Expand Up @@ -83,13 +83,17 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) {
case sync.ReqBeaconBlock:
log.Debug("Beacon API: requesting block", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetBeaconBlock(common.Hash(data))
case sync.ReqFinality:
log.Debug("Beacon API: requesting finality update")
resp, err = s.api.GetFinalityUpdate()
default:
}

if err != nil {
log.Warn("Beacon API request failed", "type", reflect.TypeOf(req), "reqid", id, "err", err)
s.eventCallback(request.Event{Type: request.EvFail, Data: request.RequestResponse{ID: id, Request: req}})
} else {
log.Debug("Beacon API request answered", "type", reflect.TypeOf(req), "reqid", id)
s.eventCallback(request.Event{Type: request.EvResponse, Data: request.RequestResponse{ID: id, Request: req, Response: resp}})
}
}()
Expand Down
77 changes: 53 additions & 24 deletions beacon/light/api/light_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)

var (
Expand Down Expand Up @@ -184,46 +185,56 @@ func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64
return updates, committees, nil
}

// GetOptimisticHeadUpdate fetches a signed header based on the latest available
// optimistic update. Note that the signature should be verified by the caller
// as its validity depends on the update chain.
// GetOptimisticUpdate fetches the latest available optimistic update.
// Note that the signature should be verified by the caller as its validity
// depends on the update chain.
//
// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate
func (api *BeaconLightApi) GetOptimisticHeadUpdate() (types.SignedHeader, error) {
func (api *BeaconLightApi) GetOptimisticUpdate() (types.OptimisticUpdate, error) {
resp, err := api.httpGet("/eth/v1/beacon/light_client/optimistic_update")
if err != nil {
return types.SignedHeader{}, err
return types.OptimisticUpdate{}, err
}
return decodeOptimisticHeadUpdate(resp)
return decodeOptimisticUpdate(resp)
}

func decodeOptimisticHeadUpdate(enc []byte) (types.SignedHeader, error) {
func decodeOptimisticUpdate(enc []byte) (types.OptimisticUpdate, error) {
var data struct {
Data struct {
Header jsonBeaconHeader `json:"attested_header"`
Aggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
Version string
Data struct {
Attested jsonHeaderWithExecProof `json:"attested_header"`
Aggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
} `json:"data"`
}
if err := json.Unmarshal(enc, &data); err != nil {
return types.SignedHeader{}, err
return types.OptimisticUpdate{}, err
}
// Decode the execution payload headers.
attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
if err != nil {
return types.OptimisticUpdate{}, fmt.Errorf("invalid attested header: %v", err)
}
if data.Data.Header.Beacon.StateRoot == (common.Hash{}) {
if data.Data.Attested.Beacon.StateRoot == (common.Hash{}) {
// workaround for different event encoding format in Lodestar
if err := json.Unmarshal(enc, &data.Data); err != nil {
return types.SignedHeader{}, err
return types.OptimisticUpdate{}, err
}
}

if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
return types.SignedHeader{}, errors.New("invalid sync_committee_bits length")
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_bits length")
}
if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
return types.SignedHeader{}, errors.New("invalid sync_committee_signature length")
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_signature length")
}
return types.SignedHeader{
Header: data.Data.Header.Beacon,
return types.OptimisticUpdate{
Attested: types.HeaderWithExecProof{
Header: data.Data.Attested.Beacon,
PayloadHeader: attestedExecHeader,
PayloadBranch: data.Data.Attested.ExecutionBranch,
},
Signature: data.Data.Aggregate,
SignatureSlot: uint64(data.Data.SignatureSlot),
}, nil
Expand Down Expand Up @@ -411,7 +422,7 @@ func decodeHeadEvent(enc []byte) (uint64, common.Hash, error) {

type HeadEventListener struct {
OnNewHead func(slot uint64, blockRoot common.Hash)
OnSignedHead func(head types.SignedHeader)
OnOptimistic func(head types.OptimisticUpdate)
OnFinality func(head types.FinalityUpdate)
OnError func(err error)
}
Expand Down Expand Up @@ -449,21 +460,35 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
defer wg.Done()

// Request initial data.
log.Trace("Requesting initial head header")
if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
log.Trace("Retrieved initial head header", "slot", head.Slot, "hash", head.Hash())
listener.OnNewHead(head.Slot, head.Hash())
} else {
log.Debug("Failed to retrieve initial head header", "error", err)
}
if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil {
listener.OnSignedHead(signedHead)
log.Trace("Requesting initial optimistic update")
if optimisticUpdate, err := api.GetOptimisticUpdate(); err == nil {
log.Trace("Retrieved initial optimistic update", "slot", optimisticUpdate.Attested.Slot, "hash", optimisticUpdate.Attested.Hash())
listener.OnOptimistic(optimisticUpdate)
} else {
log.Debug("Failed to retrieve initial optimistic update", "error", err)
}
log.Trace("Requesting initial finality update")
if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
log.Trace("Retrieved initial finality update", "slot", finalityUpdate.Finalized.Slot, "hash", finalityUpdate.Finalized.Hash())
listener.OnFinality(finalityUpdate)
} else {
log.Debug("Failed to retrieve initial finality update", "error", err)
}

log.Trace("Starting event stream processing loop")
// Receive the stream.
var stream *eventsource.Stream
select {
case stream = <-streamCh:
case <-ctx.Done():
log.Trace("Stopping event stream processing loop")
return
}

Expand All @@ -474,8 +499,10 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()

case event, ok := <-stream.Events:
if !ok {
log.Trace("Event stream closed")
return
}
log.Trace("New event received from event stream", "type", event.Event())
switch event.Event() {
case "head":
slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
Expand All @@ -485,9 +512,9 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
listener.OnError(fmt.Errorf("error decoding head event: %v", err))
}
case "light_client_optimistic_update":
signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data()))
optimisticUpdate, err := decodeOptimisticUpdate([]byte(event.Data()))
if err == nil {
listener.OnSignedHead(signedHead)
listener.OnOptimistic(optimisticUpdate)
} else {
listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
}
Expand Down Expand Up @@ -521,7 +548,8 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
// established. It can only return nil when the context is canceled.
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update"
path := "/eth/v1/events?topics=head&topics=light_client_finality_update&topics=light_client_optimistic_update"
log.Trace("Sending event subscription request")
req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil)
if err != nil {
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
Expand All @@ -535,6 +563,7 @@ func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadE
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
continue
}
log.Trace("Successfully created event stream")
return stream
}
return nil
Expand Down
Loading
Loading