Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(validator): listen and get metadata only for committee validators #1787

Merged
merged 18 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/records"
protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p"
)

Expand Down Expand Up @@ -46,6 +47,8 @@ type P2PNetwork interface {
SubscribeRandoms(logger *zap.Logger, numSubnets int) error
// UpdateScoreParams will update the scoring parameters of GossipSub
UpdateScoreParams(logger *zap.Logger)
//ActiveSubnets returns active subnets
ActiveSubnets() records.Subnets

// used for tests and api
PeersByTopic() ([]peer.ID, map[string][]peer.ID)
Expand Down
12 changes: 6 additions & 6 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,6 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error {
ids.Start()
}

subnetsProvider := func() records.Subnets {
return n.activeSubnets
}

// Handshake filters
filters := func() []connections.HandshakeFilter {
newDomain := n.cfg.Network.DomainType
Expand All @@ -224,19 +220,23 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error {
IDService: ids,
Network: n.host.Network(),
DomainType: n.cfg.Network.DomainType,
SubnetsProvider: subnetsProvider,
SubnetsProvider: n.ActiveSubnets,
}, filters)

n.host.SetStreamHandler(peers.NodeInfoProtocol, handshaker.Handler(logger))
logger.Debug("handshaker is ready")

n.connHandler = connections.NewConnHandler(n.ctx, handshaker, subnetsProvider, n.idx, n.idx, n.idx, n.metrics)
n.connHandler = connections.NewConnHandler(n.ctx, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx, n.metrics)
n.host.Network().Notify(n.connHandler.Handle(logger))
logger.Debug("connection handler is ready")

return nil
}

func (n *p2pNetwork) ActiveSubnets() records.Subnets {
return n.activeSubnets
}

func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error {
ipAddr, err := p2pcommons.IPAddr()
if err != nil {
Expand Down
39 changes: 34 additions & 5 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/message/validation"
"github.com/ssvlabs/ssv/network"
networkcommons "github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/duties"
Expand Down Expand Up @@ -138,6 +140,7 @@ type P2PNetwork interface {
protocolp2p.Broadcaster
UseMessageRouter(router network.MessageRouter)
SubscribeRandoms(logger *zap.Logger, numSubnets int) error
ActiveSubnets() records.Subnets
}

// controller implements Controller
Expand Down Expand Up @@ -456,13 +459,18 @@ func (c *controller) StartValidators() {
return
}

mySubnets := c.selfSubnets()

var ownShares []*ssvtypes.SSVShare
var allPubKeys = make([][]byte, 0, len(shares))
var pubKeysToFetch [][]byte
for _, share := range shares {
if c.operatorDataStore.GetOperatorID() != 0 && share.BelongsToOperator(c.operatorDataStore.GetOperatorID()) {
ownShares = append(ownShares, share)
}
y0sher marked this conversation as resolved.
Show resolved Hide resolved
allPubKeys = append(allPubKeys, share.ValidatorPubKey[:])
subnet := networkcommons.CommitteeSubnet(share.CommitteeID())
if uint64(len(mySubnets)) >= subnet && mySubnets[subnet] != 0 {
nkryuchkov marked this conversation as resolved.
Show resolved Hide resolved
pubKeysToFetch = append(pubKeysToFetch, share.ValidatorPubKey[:])
}
}

if c.validatorOptions.Exporter {
Expand Down Expand Up @@ -494,20 +502,33 @@ func (c *controller) StartValidators() {
}
if !hasMetadata {
start := time.Now()
err := c.fetchAndUpdateValidatorsMetadata(c.logger, allPubKeys, c.beacon)
err := c.fetchAndUpdateValidatorsMetadata(c.logger, pubKeysToFetch, c.beacon)
if err != nil {
c.logger.Error("failed to update validators metadata after setup",
zap.Int("shares", len(allPubKeys)),
zap.Int("shares", len(pubKeysToFetch)),
fields.Took(time.Since(start)),
zap.Error(err))
} else {
c.logger.Debug("updated validators metadata after setup",
zap.Int("shares", len(allPubKeys)),
zap.Int("shares", len(pubKeysToFetch)),
fields.Took(time.Since(start)))
}
}
}

func (c *controller) selfSubnets() records.Subnets {
myValidators := c.validatorStore.OperatorValidators(c.operatorDataStore.GetOperatorID())
mySubnets := make(records.Subnets, networkcommons.SubnetsCount)
for _, v := range myValidators {
subnet := networkcommons.CommitteeSubnet(v.CommitteeID())
if mySubnets[subnet] == 0 {
mySubnets[subnet] = 1
}
nkryuchkov marked this conversation as resolved.
Show resolved Hide resolved
}

return mySubnets
}

// setupValidators setup and starts validators from the given shares.
// shares w/o validator's metadata won't start, but the metadata will be fetched and the validator will start afterwards
func (c *controller) setupValidators(shares []*ssvtypes.SSVShare) ([]*validator.Validator, []*validator.Committee) {
Expand Down Expand Up @@ -969,13 +990,21 @@ func (c *controller) UpdateValidatorMetaDataLoop() {
var sleep = 2 * time.Second

for {
activeSubnets := c.network.ActiveSubnets()

// Get the shares to fetch metadata for.
start := time.Now()
var existingShares, newShares []*ssvtypes.SSVShare
c.sharesStorage.Range(nil, func(share *ssvtypes.SSVShare) bool {
if share.Liquidated {
return true
}

y0sher marked this conversation as resolved.
Show resolved Hide resolved
subnet := networkcommons.CommitteeSubnet(share.CommitteeID())
nkryuchkov marked this conversation as resolved.
Show resolved Hide resolved
if uint64(len(activeSubnets)) < subnet || activeSubnets[subnet] == 0 {
return true
}

if share.BeaconMetadata == nil && share.MetadataLastUpdated().IsZero() {
newShares = append(newShares, share)
} else if time.Since(share.MetadataLastUpdated()) > c.metadataUpdateInterval {
Expand Down
32 changes: 22 additions & 10 deletions operator/validator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
ibftstorage "github.com/ssvlabs/ssv/ibft/storage"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/networkconfig"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/keys"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/ssvlabs/ssv/protocol/v2/ssv/validator"
"github.com/ssvlabs/ssv/protocol/v2/types"
registrystorage "github.com/ssvlabs/ssv/registry/storage"
registrystoragemocks "github.com/ssvlabs/ssv/registry/storage/mocks"
"github.com/ssvlabs/ssv/storage/basedb"
"github.com/ssvlabs/ssv/storage/kv"
)
Expand All @@ -62,6 +64,7 @@ type MockControllerOptions struct {
signer spectypes.BeaconSigner
StorageMap *ibftstorage.QBFTStores
validatorsMap *validators.ValidatorsMap
validatorStore registrystorage.ValidatorStore
operatorDataStore operatordatastore.OperatorDataStore
operatorStorage registrystorage.Operators
networkConfig networkconfig.NetworkConfig
Expand Down Expand Up @@ -190,15 +193,14 @@ func TestSetupValidatorsExporter(t *testing.T) {
testCases := []struct {
name string
shareStorageListResponse []*types.SSVShare
expectMetadataFetch bool
syncHighestDecidedResponse error
getValidatorDataResponse error
}{
{"no shares of non committee", nil, false, nil, nil},
{"set up non committee validators", sharesWithMetadata, false, nil, nil},
{"set up non committee validators without metadata", sharesWithoutMetadata, true, nil, nil},
{"fail to sync highest decided", sharesWithMetadata, false, errors.New("failed to sync highest decided"), nil},
{"fail to update validators metadata", sharesWithMetadata, false, nil, errors.New("could not update all validators")},
{"no shares of non committee", nil, nil, nil},
{"set up non committee validators", sharesWithMetadata, nil, nil},
{"set up non committee validators without metadata", sharesWithoutMetadata, nil, nil},
{"fail to sync highest decided", sharesWithMetadata, errors.New("failed to sync highest decided"), nil},
{"fail to update validators metadata", sharesWithMetadata, nil, errors.New("could not update all validators")},
}

for _, tc := range testCases {
Expand All @@ -208,6 +210,13 @@ func TestSetupValidatorsExporter(t *testing.T) {
defer ctrl.Finish()
mockValidatorsMap := validators.New(context.TODO())

subnets := [commons.SubnetsCount]byte{}
for _, share := range sharesWithMetadata {
subnets[commons.CommitteeSubnet(share.CommitteeID())] = 1
}

network.EXPECT().ActiveSubnets().Return(subnets[:]).AnyTimes()

if tc.shareStorageListResponse == nil {
sharesStorage.EXPECT().List(gomock.Any(), gomock.Any()).Return(tc.shareStorageListResponse).Times(1)
} else {
Expand All @@ -227,14 +236,15 @@ func TestSetupValidatorsExporter(t *testing.T) {
}
}
}).AnyTimes()
if tc.expectMetadataFetch {
bc.EXPECT().GetValidatorData(gomock.Any()).Return(bcResponse, tc.getValidatorDataResponse).Times(1)
bc.EXPECT().GetBeaconNetwork().Return(networkconfig.Mainnet.Beacon.GetBeaconNetwork()).AnyTimes()
}
bc.EXPECT().GetValidatorData(gomock.Any()).Return(bcResponse, tc.getValidatorDataResponse).AnyTimes()
bc.EXPECT().GetBeaconNetwork().Return(networkconfig.Mainnet.Beacon.GetBeaconNetwork()).AnyTimes()
sharesStorage.EXPECT().UpdateValidatorsMetadata(gomock.Any()).Return(nil).AnyTimes()
recipientStorage.EXPECT().GetRecipientData(gomock.Any(), gomock.Any()).Return(recipientData, true, nil).AnyTimes()
}

mockValidatorStore := registrystoragemocks.NewMockValidatorStore(ctrl)
mockValidatorStore.EXPECT().OperatorValidators(gomock.Any()).Return(sharesWithMetadata).AnyTimes()

validatorStartFunc := func(validator *validator.Validator) (bool, error) {
return true, nil
}
Expand All @@ -246,6 +256,7 @@ func TestSetupValidatorsExporter(t *testing.T) {
sharesStorage: sharesStorage,
recipientsStorage: recipientStorage,
validatorsMap: mockValidatorsMap,
validatorStore: mockValidatorStore,
validatorOptions: validator.Options{
Exporter: true,
},
Expand Down Expand Up @@ -1011,6 +1022,7 @@ func setupController(logger *zap.Logger, opts MockControllerOptions) controller
sharesStorage: opts.sharesStorage,
operatorsStorage: opts.operatorStorage,
validatorsMap: opts.validatorsMap,
validatorStore: opts.validatorStore,
ctx: context.Background(),
validatorOptions: opts.validatorOptions,
recipientsStorage: opts.recipientsStorage,
Expand Down
Loading
Loading