Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: optimize subnet calculations using big.Int for reduced memory usage #1969

Merged
merged 5 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion network/commons/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ const (
topicPrefix = "ssv.v2"
)

// BigIntSubnetsCount is the big.Int representation of SubnetsCount
var bigIntSubnetsCount *big.Int

func init() {
bigIntSubnetsCount = new(big.Int).SetUint64(SubnetsCount)
}

const (
signatureSize = 256
signatureOffset = 0
Expand Down Expand Up @@ -57,10 +64,16 @@ func GetTopicBaseName(topicName string) string {

// CommitteeSubnet returns the subnet for the given committee
func CommitteeSubnet(cid spectypes.CommitteeID) uint64 {
subnet := new(big.Int).Mod(new(big.Int).SetBytes(cid[:]), new(big.Int).SetUint64(SubnetsCount))
subnet := new(big.Int).Mod(new(big.Int).SetBytes(cid[:]), bigIntSubnetsCount)
return subnet.Uint64()
}

// SetCommitteeSubnet returns the subnet for the given committee, it doesn't allocate memory but uses the passed in big.Int
func SetCommitteeSubnet(bigInst *big.Int, cid spectypes.CommitteeID) {
bigInst.SetBytes(cid[:])
bigInst.Set(bigInst.Mod(bigInst, bigIntSubnetsCount))
}

// MsgIDFunc is the function that maps a message to a msg_id
type MsgIDFunc func(msg []byte) string

Expand Down
31 changes: 31 additions & 0 deletions network/commons/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package commons

import (
"crypto/rand"
"math/big"
"testing"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/require"
)

func TestCommitteeSubnet(t *testing.T) {
require.Equal(t, Subnets(), int(bigIntSubnetsCount.Uint64()))

bigInst := new(big.Int)
for i := 0; i < Subnets()*2; i++ {
var cid spectypes.CommitteeID
if _, err := rand.Read(cid[:]); err != nil {
t.Fatal(err)
}

// Get result from CommitteeSubnet
expected := CommitteeSubnet(cid)

// Get result from SetCommitteeSubnet
SetCommitteeSubnet(bigInst, cid)
actual := bigInst.Uint64()

require.Equal(t, expected, actual)
}
}
28 changes: 21 additions & 7 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"sync"
"time"

Expand Down Expand Up @@ -451,15 +452,18 @@ func (c *controller) StartValidators() {
return
}

mySubnets := c.selfSubnets()
intBuf := new(big.Int)

mySubnets := c.selfSubnets(intBuf)

var ownShares []*ssvtypes.SSVShare
var pubKeysToFetch [][]byte
for _, share := range shares {
if c.operatorDataStore.GetOperatorID() != 0 && share.BelongsToOperator(c.operatorDataStore.GetOperatorID()) {
ownShares = append(ownShares, share)
}
subnet := networkcommons.CommitteeSubnet(share.CommitteeID())
networkcommons.SetCommitteeSubnet(intBuf, share.CommitteeID())
subnet := intBuf.Uint64()
if mySubnets[subnet] != 0 {
pubKeysToFetch = append(pubKeysToFetch, share.ValidatorPubKey[:])
}
Expand Down Expand Up @@ -508,16 +512,23 @@ func (c *controller) StartValidators() {
}
}

func (c *controller) selfSubnets() records.Subnets {
// selfSubnets calculates the operator's subnets by adding up the fixed subnets and the active committees
// it recvs big int buffer for memory reusing, if is nil it will allocate new
func (c *controller) selfSubnets(buf *big.Int) records.Subnets {
// Start off with a copy of the fixed subnets (e.g., exporter subscribed to all subnets).
localBuf := buf
if localBuf == nil {
localBuf = new(big.Int)
}

mySubnets := make(records.Subnets, networkcommons.Subnets())
copy(mySubnets, c.network.FixedSubnets())

// Compute the new subnets according to the active committees/validators.
myValidators := c.validatorStore.OperatorValidators(c.operatorDataStore.GetOperatorID())
for _, v := range myValidators {
subnet := networkcommons.CommitteeSubnet(v.CommitteeID())
mySubnets[subnet] = 1
networkcommons.SetCommitteeSubnet(localBuf, v.CommitteeID())
mySubnets[localBuf.Uint64()] = 1
}

return mySubnets
Expand Down Expand Up @@ -983,18 +994,21 @@ func (c *controller) UpdateValidatorMetaDataLoop() {
const batchSize = 512
var sleep = 2 * time.Second

intBuf := new(big.Int)

for {
// Get the shares to fetch metadata for.
start := time.Now()

mySubnets := c.selfSubnets()
mySubnets := c.selfSubnets(intBuf)
var existingShares, newShares []*ssvtypes.SSVShare
c.sharesStorage.Range(nil, func(share *ssvtypes.SSVShare) bool {
if share.Liquidated {
return true
}

subnet := networkcommons.CommitteeSubnet(share.CommitteeID())
networkcommons.SetCommitteeSubnet(intBuf, share.CommitteeID())
subnet := intBuf.Uint64()
if mySubnets[subnet] == 0 {
return true
}
Expand Down
Loading