Skip to content

Commit

Permalink
Caplin: Moving more code out of the ForkChoiceStoire (#9933)
Browse files Browse the repository at this point in the history
I also added `EthereumClock` which is an abstraction over ETH 2.0
reliance on `unix.Nano()` for better testing in the future. I just added
the `struct` this object is not used anywhere. also had to regenerate
some mocks.
  • Loading branch information
Giulio2002 authored Apr 14, 2024
1 parent 0be04e6 commit 1d95570
Show file tree
Hide file tree
Showing 31 changed files with 1,315 additions and 660 deletions.
36 changes: 24 additions & 12 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon/cl/phase1/core/state/lru"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/phase1/network/services"
"github.com/ledgerwatch/erigon/cl/pool"
"github.com/ledgerwatch/erigon/cl/validator/attestation_producer"
"github.com/ledgerwatch/erigon/cl/validator/committee_subscription"
Expand Down Expand Up @@ -75,6 +76,11 @@ type ApiHandler struct {
committeeSub *committee_subscription.CommitteeSubscribeMgmt
attestationProducer attestation_producer.AttestationDataProducer
aggregatePool aggregation.AggregationPool

// services
syncCommitteeMessagesService services.SyncCommitteeMessagesService
syncContributionAndProofsService services.SyncContributionService
aggregateAndProofsService services.AggregateAndProofService
}

func NewApiHandler(
Expand All @@ -100,6 +106,9 @@ func NewApiHandler(
syncMessagePool sync_contribution_pool.SyncContributionPool,
committeeSub *committee_subscription.CommitteeSubscribeMgmt,
aggregatePool aggregation.AggregationPool,
syncCommitteeMessagesService services.SyncCommitteeMessagesService,
syncContributionAndProofs services.SyncContributionService,
aggregateAndProofs services.AggregateAndProofService,
) *ApiHandler {
blobBundles, err := lru.New[common.Bytes48, BlobBundle]("blobs", maxBlobBundleCacheSize)
if err != nil {
Expand All @@ -121,18 +130,21 @@ func NewApiHandler(
randaoMixesPool: sync.Pool{New: func() interface{} {
return solid.NewHashVector(int(beaconChainConfig.EpochsPerHistoricalVector))
}},
sentinel: sentinel,
version: version,
routerCfg: routerCfg,
emitters: emitters,
blobStoage: blobStoage,
caplinSnapshots: caplinSnapshots,
attestationProducer: attestationProducer,
blobBundles: blobBundles,
engine: engine,
syncMessagePool: syncMessagePool,
committeeSub: committeeSub,
aggregatePool: aggregatePool,
sentinel: sentinel,
version: version,
routerCfg: routerCfg,
emitters: emitters,
blobStoage: blobStoage,
caplinSnapshots: caplinSnapshots,
attestationProducer: attestationProducer,
blobBundles: blobBundles,
engine: engine,
syncMessagePool: syncMessagePool,
committeeSub: committeeSub,
aggregatePool: aggregatePool,
syncCommitteeMessagesService: syncCommitteeMessagesService,
syncContributionAndProofsService: syncContributionAndProofs,
aggregateAndProofsService: aggregateAndProofs,
}
}

Expand Down
8 changes: 5 additions & 3 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package handler

import (
"encoding/json"
"errors"
"net/http"

"github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
"github.com/ledgerwatch/erigon/cl/beacon/beaconhttp"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/gossip"
"github.com/ledgerwatch/erigon/cl/phase1/network/services"
"github.com/ledgerwatch/erigon/cl/phase1/network/subnets"
"github.com/ledgerwatch/log/v3"
)
Expand Down Expand Up @@ -263,7 +265,7 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,

failures := []poolingFailure{}
for _, v := range req {
if err := a.forkchoiceStore.OnAggregateAndProof(v, false); err != nil {
if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
}
Expand Down Expand Up @@ -306,7 +308,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r
continue
}
for _, subnet := range publishingSubnets {
if err := a.forkchoiceStore.OnSyncCommitteeMessage(v, subnet); err != nil {
if err := a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
break
}
Expand Down Expand Up @@ -353,7 +355,7 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
}
failures := []poolingFailure{}
for idx, v := range msgs {
if err := a.forkchoiceStore.OnSignedContributionAndProof(v, false); err != nil {
if err := a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
continue
}
Expand Down
22 changes: 21 additions & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"github.com/ledgerwatch/erigon/cl/persistence/state/historical_states_reader"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
"github.com/ledgerwatch/erigon/cl/phase1/network/services/mock_services"
"github.com/ledgerwatch/erigon/cl/pool"
"github.com/ledgerwatch/erigon/cl/validator/validator_params"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logger) (db kv.RwDB, blocks []*cltypes.SignedBeaconBlock, f afero.Fs, preState, postState *state.CachingBeaconState, h *ApiHandler, opPool pool.OperationsPool, syncedData *synced_data.SyncedDataManager, fcu *forkchoice.ForkChoiceStorageMock, vp *validator_params.ValidatorParams) {
Expand Down Expand Up @@ -78,6 +80,24 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
CommitmentInclusionProof: solid.NewHashVector(17),
},
})
ctrl := gomock.NewController(t)
syncCommitteeMessagesService := mock_services.NewMockSyncCommitteeMessagesService(ctrl)
syncContributionService := mock_services.NewMockSyncContributionService(ctrl)
aggregateAndProofsService := mock_services.NewMockAggregateAndProofService(ctrl)
// ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessage) error
syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessage) error {
return h.syncMessagePool.AddSyncCommitteeMessage(postState, *subnetID, msg)
}).AnyTimes()

syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedContributionAndProof) error {
return h.syncMessagePool.AddSyncContribution(postState, msg.Message.Contribution)
}).AnyTimes()

aggregateAndProofsService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedAggregateAndProof) error {
opPool.AttestationsPool.Insert(msg.Message.Aggregate.Signature(), msg.Message.Aggregate)
return nil
}).AnyTimes()

vp = validator_params.NewValidatorParams()
h = NewApiHandler(
logger,
Expand All @@ -100,7 +120,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
Events: true,
Validator: true,
Lighthouse: true,
}, nil, blobStorage, nil, vp, nil, nil, fcu.SyncContributionPool, nil, nil) // TODO: add tests
}, nil, blobStorage, nil, vp, nil, nil, fcu.SyncContributionPool, nil, nil, syncCommitteeMessagesService, syncContributionService, aggregateAndProofsService) // TODO: add tests
h.Init()
return
}
3 changes: 3 additions & 0 deletions cl/beacon/handler/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (t *validatorTestSuite) SetupTest() {
nil,
nil,
t.mockAggrPool,
nil,
nil,
nil,
)
t.gomockCtrl = gomockCtrl
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/ledgerwatch/erigon/cl/antiquary/tests"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client/block_collector"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestBlockCollectorAccumulateAndFlush(t *testing.T) {
Expand Down
33 changes: 19 additions & 14 deletions cl/phase1/execution_client/execution_engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cl/phase1/forkchoice/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestForkChoiceBasic(t *testing.T) {
require.NoError(t, utils.DecodeSSZSnappy(anchorState, anchorStateEncoded, int(clparams.AltairVersion)))
pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig)
emitters := beaconevents.NewEmitters()
store, err := forkchoice.NewForkChoiceStore(anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}), emitters, sd, nil, nil, nil, nil)
store, err := forkchoice.NewForkChoiceStore(anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}), emitters, sd, nil, nil, nil)
require.NoError(t, err)
// first steps
store.OnTick(0)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) {
sd := synced_data.NewSyncedDataManager(true, &clparams.MainnetBeaconConfig)
store, err := forkchoice.NewForkChoiceStore(anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{
Beacon: true,
}), emitters, sd, nil, nil, nil, nil)
}), emitters, sd, nil, nil, nil)
store.OnTick(2000)
require.NoError(t, err)
for _, block := range blocks {
Expand Down
Loading

0 comments on commit 1d95570

Please sign in to comment.