Skip to content

Commit

Permalink
apply changes from #1969
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jan 12, 2025
1 parent f8ede80 commit db78db1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
6 changes: 3 additions & 3 deletions network/commons/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func CommitteeSubnet(cid spectypes.CommitteeID) uint64 {
}

// SetCommitteeSubnet returns the subnet for the given committee, it doesn't allocate memory but uses the passed in big.Int
func SetCommitteeSubnet(bigInst *big.Int, cid spectypes.CommitteeID) {
bigInst.SetBytes(cid[:])
bigInst.Mod(bigInst, bigIntSubnetsCount)
func SetCommitteeSubnet(bigInt *big.Int, cid spectypes.CommitteeID) {
bigInt.SetBytes(cid[:])
bigInt.Mod(bigInt, bigIntSubnetsCount)
}

// MsgIDFunc is the function that maps a message to a msg_id
Expand Down
37 changes: 25 additions & 12 deletions operator/validator/metadata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metadata
import (
"context"
"fmt"
"math/big"
"time"

"github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -90,14 +91,16 @@ func (s *Syncer) SyncOnStartup(ctx context.Context) (map[spectypes.ValidatorPK]*
return nil, nil
}

ownSubnets := s.selfSubnets()
subnetsBuf := new(big.Int)
ownSubnets := s.selfSubnets(subnetsBuf)

// Skip syncing if metadata was already fetched before
// to prevent blocking startup after first sync.
needToSync := false
pubKeysToFetch := make([]spectypes.ValidatorPK, 0, len(shares))
for _, share := range shares {
subnet := networkcommons.CommitteeSubnet(share.CommitteeID())
networkcommons.SetCommitteeSubnet(subnetsBuf, share.CommitteeID())
subnet := subnetsBuf.Uint64()
if ownSubnets[subnet] == 0 {
continue
}
Expand Down Expand Up @@ -187,8 +190,10 @@ func (s *Syncer) Stream(ctx context.Context) <-chan SyncBatch {
go func() {
defer close(metadataUpdates)

subnetsBuf := new(big.Int)

for {
batch, done, err := s.syncNextBatch(ctx)
batch, done, err := s.syncNextBatch(ctx, subnetsBuf)
if err != nil {
s.logger.Warn("failed to prepare validators metadata",
zap.Error(err),
Expand Down Expand Up @@ -234,15 +239,15 @@ func (s *Syncer) Stream(ctx context.Context) <-chan SyncBatch {
// It is used only by Stream method.
// The maximal size is batchSize as we want to reduce the load while streaming.
// Therefore, syncNextBatch should be called in a loop, so the rest will be prepared by next calls.
func (s *Syncer) syncNextBatch(ctx context.Context) (SyncBatch, bool, error) {
func (s *Syncer) syncNextBatch(ctx context.Context, subnetsBuf *big.Int) (SyncBatch, bool, error) {
// TODO: Methods called here don't handle context, so this is a workaround to handle done context. It should be removed once ctx is handled gracefully.
select {
case <-ctx.Done():
return SyncBatch{}, false, ctx.Err()
default:
}

shares := s.nextBatch(ctx)
shares := s.nextBatch(ctx, subnetsBuf)
if len(shares) == 0 {
return SyncBatch{}, false, nil
}
Expand Down Expand Up @@ -271,18 +276,18 @@ func (s *Syncer) syncNextBatch(ctx context.Context) (SyncBatch, bool, error) {
}

// nextBatch returns non-liquidated shares from DB that are most deserving of an update, it relies on share.Metadata.lastUpdated to be updated in order to keep iterating forward.
func (s *Syncer) nextBatch(_ context.Context) []*ssvtypes.SSVShare {
func (s *Syncer) nextBatch(_ context.Context, subnetsBuf *big.Int) []*ssvtypes.SSVShare {
// TODO: use context, return if it's done
ownSubnets := s.selfSubnets()
ownSubnets := s.selfSubnets(subnetsBuf)

var staleShares, newShares []*ssvtypes.SSVShare
s.shareStorage.Range(nil, func(share *ssvtypes.SSVShare) bool {
if share.Liquidated {
return true
}

subnet := networkcommons.CommitteeSubnet(share.CommitteeID())

networkcommons.SetCommitteeSubnet(subnetsBuf, share.CommitteeID())
subnet := subnetsBuf.Uint64()
if ownSubnets[subnet] == 0 {
return true
}
Expand Down Expand Up @@ -341,12 +346,20 @@ func (s *Syncer) sleep(ctx context.Context, d time.Duration) (slept bool) {
}
}

func (s *Syncer) selfSubnets() records.Subnets {
// selfSubnets calculates the operator's subnets by adding up the fixed subnets and the active committees
// it recvs big int buffer for memory reusing, if is nil it will allocate new
func (s *Syncer) selfSubnets(buf *big.Int) records.Subnets {
// Start off with a copy of the fixed subnets (e.g., exporter subscribed to all subnets).
localBuf := buf
if localBuf == nil {
localBuf = new(big.Int)
}

myValidators := s.validatorStore.SelfValidators()
mySubnets := make(records.Subnets, networkcommons.SubnetsCount)
for _, v := range myValidators {
subnet := networkcommons.CommitteeSubnet(v.CommitteeID())
mySubnets[subnet] = 1
networkcommons.SetCommitteeSubnet(localBuf, v.CommitteeID())
mySubnets[localBuf.Uint64()] = 1
}

return mySubnets
Expand Down

0 comments on commit db78db1

Please sign in to comment.