Skip to content

Commit

Permalink
Extend proposal builder to allow the use of a fallback active set. (#…
Browse files Browse the repository at this point in the history
…5378)

## Motivation
related to #5366 

Allow to set a fallback active set to be used by the proposal builder via the same mechanism as we use for the fallback beacon.

## Changes
- some minor fixes in `bootstrap` cmd code
- extend `ProposalBuilder` to be able to set a specific active set for an epoch
- extend node routine listening for updates to fallback beacon / activeset to propagate a fallback to the `ProposalBuilder`

## Test Plan
- added test case to `ProposalBuilder` to verify use of fallback active set if available
- manual testing on `testnet-10`

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Update documentation as needed
- [x] Update [changelog](../CHANGELOG.md) as needed
  • Loading branch information
fasmat committed Dec 21, 2023
1 parent c580fba commit 3675992
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 64 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ for more information on how to configure the node to work with the PoST service.
* [#5367](https://github.com/spacemeshos/go-spacemesh/pull/5367) Add `no-main-override` toplevel config option and
`--no-main-override` CLI option that makes it possible to run "nomain" builds on mainnet.

* [#5384](https://github.com/spacemeshos/go-spacemesh/pull/5384) to improve network stability and performance allow the
active set to be set in advance for an epoch. This allows the network to start consensus on the first layer of an epoch.

## Release v1.2.12

### Improvements

* [#5373](https://github.com/spacemeshos/go-spacemesh/pull/5373) automatic scaling of post verifying workers to a lower
value (1 by default) when POST proving starts. The workers are scaled up when POST proving finishes.

* [#5382](https://github.com/spacemeshos/go-spacemesh/pull/5382) avoid processing same (gossiped/fetched) ATX many times
in parallel

## Release v1.2.11

### Improvements

* increased the max response data size in p2p to 40MiB

## Release v1.2.10

### Improvements

* further increased cache sizes and and p2p timeouts to compensate for the increased number of nodes on the network.

## Release v1.2.9

### Improvements
Expand Down
8 changes: 3 additions & 5 deletions bootstrap/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
suffixLen = 2
SuffixBeacon = "bc"
SuffixActiveSet = "as"
SuffixBoostrap = "bs"
SuffixBootstrap = "bs"

httpTimeout = 5 * time.Second
notifyTimeout = time.Second
Expand Down Expand Up @@ -210,7 +210,7 @@ func (u *Updater) addUpdate(epoch types.EpochID, suffix string) {
u.mu.Lock()
defer u.mu.Unlock()
switch suffix {
case SuffixActiveSet, SuffixBeacon, SuffixBoostrap:
case SuffixActiveSet, SuffixBeacon, SuffixBootstrap:
default:
return
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (u *Updater) DoIt(ctx context.Context) error {
}
}()
for _, epoch := range requiredEpochs(current) {
verified, cached, err := u.checkEpochUpdate(ctx, epoch, SuffixBoostrap)
verified, cached, err := u.checkEpochUpdate(ctx, epoch, SuffixBootstrap)
if err != nil {
return err
}
Expand Down Expand Up @@ -324,8 +324,6 @@ func (u *Updater) get(ctx context.Context, uri string) (*VerifiedUpdate, []byte,
return nil, nil, fmt.Errorf("scheme not supported %v", resource.Scheme)
}

ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
t0 := time.Now()
data, err := query(ctx, u.client, resource)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions bootstrap/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestLoad(t *testing.T) {
{
desc: "recovery required",
persisted: map[types.EpochID][]string{
current - 2: {bootstrap.SuffixBoostrap, update1},
current - 2: {bootstrap.SuffixBootstrap, update1},
current - 1: {bootstrap.SuffixActiveSet, update2},
current: {bootstrap.SuffixBeacon, update3},
current + 1: {bootstrap.SuffixActiveSet, update4},
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestLoadedNotDownloadedAgain(t *testing.T) {
cfg.DataDir,
bootstrap.DirName,
strconv.Itoa(int(epoch)),
bootstrap.UpdateName(epoch, bootstrap.SuffixBoostrap),
bootstrap.UpdateName(epoch, bootstrap.SuffixBootstrap),
)
require.NoError(t, fs.MkdirAll(filepath.Dir(persisted), 0o700))
require.NoError(t, afero.WriteFile(fs, persisted, []byte(update), 0o400))
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestDoIt(t *testing.T) {
{
desc: "in order",
updates: map[string]string{
"/" + bootstrap.UpdateName(1, bootstrap.SuffixBoostrap): update1,
"/" + bootstrap.UpdateName(1, bootstrap.SuffixBootstrap): update1,
"/" + bootstrap.UpdateName(2, bootstrap.SuffixActiveSet): update2,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBeacon): update3,
"/" + bootstrap.UpdateName(4, bootstrap.SuffixActiveSet): update4,
Expand All @@ -346,7 +346,7 @@ func TestDoIt(t *testing.T) {
{
desc: "bootstrap trumps others",
updates: map[string]string{
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBoostrap): update1,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBootstrap): update1,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixActiveSet): update2,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBeacon): update3,
"/" + bootstrap.UpdateName(4, bootstrap.SuffixActiveSet): update4,
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestNoNewUpdate(t *testing.T) {
numQ := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodGet, r.Method)
if r.URL.String() != "/"+bootstrap.UpdateName(3, bootstrap.SuffixBoostrap) {
if r.URL.String() != "/"+bootstrap.UpdateName(3, bootstrap.SuffixBootstrap) {
w.WriteHeader(http.StatusNotFound)
return
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var cmd = &cobra.Command{
Short: "generate bootstrapping data",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("epoch not specfiied")
return fmt.Errorf("epoch not specified")
}
var targetEpochs []types.EpochID
epochs := strings.Split(args[0], ",")
Expand Down
13 changes: 6 additions & 7 deletions cmd/bootstrapper/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (g *Generator) Generate(
err error
)
if genBeacon && genActiveSet {
suffix = bootstrap.SuffixBoostrap
suffix = bootstrap.SuffixBootstrap
} else if genBeacon {
suffix = bootstrap.SuffixBeacon
} else if genActiveSet {
Expand Down Expand Up @@ -181,7 +181,7 @@ func queryBitcoin(ctx context.Context, client *http.Client, targetUrl string) (*
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("bootstrap read resonse: %w", err)
return nil, fmt.Errorf("bootstrap read response: %w", err)
}
var br BitcoinResponse
err = json.Unmarshal(data, &br)
Expand All @@ -205,7 +205,7 @@ func getActiveSet(ctx context.Context, endpoint string, epoch types.EpochID) ([]
if err != nil {
return nil, fmt.Errorf("epoch stream %v: %w", endpoint, err)
}
activeSet := make([]types.ATXID, 0, 10_000)
activeSet := make([]types.ATXID, 0, 300_000)
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
Expand All @@ -227,7 +227,7 @@ func (g *Generator) GenUpdate(
) (string, error) {
as := make([]string, 0, len(activeSet))
for _, atx := range activeSet {
as = append(as, hex.EncodeToString(atx.Hash32().Bytes())) // no leading 0x
as = append(as, hex.EncodeToString(atx.Bytes())) // no leading 0x
}
var update bootstrap.Update
update.Version = SchemaVersion
Expand All @@ -243,19 +243,18 @@ func (g *Generator) GenUpdate(
}
data, err := json.Marshal(update)
if err != nil {
return "", fmt.Errorf("marshal data %v: %w", string(data), err)
return "", fmt.Errorf("marshal data: %w", err)
}
// make sure the data is valid
if err = bootstrap.ValidateSchema(data); err != nil {
return "", fmt.Errorf("invalid data %v: %w", string(data), err)
return "", fmt.Errorf("invalid data: %w", err)
}
filename := PersistedFilename(epoch, suffix)
err = afero.WriteFile(g.fs, filename, data, 0o600)
if err != nil {
return "", fmt.Errorf("persist epoch update %v: %w", filename, err)
}
g.logger.With().Info("generated update",
log.String("update", string(data)),
log.String("filename", filename),
)
return filename, nil
Expand Down
16 changes: 9 additions & 7 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,16 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) (grpcserver.Config, fu
}
}

func verifyUpdate(t *testing.T, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) {
require.NoError(t, bootstrap.ValidateSchema(data))
func verifyUpdate(tb testing.TB, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) {
tb.Helper()
require.NoError(tb, bootstrap.ValidateSchema(data))

var update bootstrap.Update
require.NoError(t, json.Unmarshal(data, &update))
require.Equal(t, SchemaVersion, update.Version)
require.EqualValues(t, epoch, update.Data.Epoch.ID)
require.Equal(t, expBeacon, update.Data.Epoch.Beacon)
require.Len(t, update.Data.Epoch.ActiveSet, expAsSize)
require.NoError(tb, json.Unmarshal(data, &update))
require.Equal(tb, SchemaVersion, update.Version)
require.Equal(tb, epoch.Uint32(), update.Data.Epoch.ID)
require.Equal(tb, expBeacon, update.Data.Epoch.Beacon)
require.Len(tb, update.Data.Epoch.ActiveSet, expAsSize)
}

func TestGenerator_Generate(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (s *Server) GenBootstrap(ctx context.Context, epoch types.EpochID) error {
if err != nil {
return err
}
suffix := bootstrap.SuffixBoostrap
suffix := bootstrap.SuffixBootstrap
_, err = s.gen.GenUpdate(epoch, epochBeacon(epoch), actives, suffix)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/bootstrapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func TestServer(t *testing.T) {

for _, epoch := range epochs {
createAtxs(t, db, epoch-1, types.RandomActiveSet(activeSetSize))
fname := PersistedFilename(epoch, bootstrap.SuffixBoostrap)
fname := PersistedFilename(epoch, bootstrap.SuffixBootstrap)
require.Eventually(t, func() bool {
_, err := fs.Stat(fname)
return err == nil
}, 5*time.Second, 100*time.Millisecond)
require.Empty(t, ch)

data := query(t, ctx, bootstrap.UpdateName(epoch, bootstrap.SuffixBoostrap))
data := query(t, ctx, bootstrap.UpdateName(epoch, bootstrap.SuffixBootstrap))
verifyUpdate(t, data, epoch, hex.EncodeToString(epochBeacon(epoch).Bytes()), activeSetSize)
require.NoError(t, fs.Remove(fname))
}
Expand Down
108 changes: 87 additions & 21 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,16 @@ type ProposalBuilder struct {
tortoise votesEncoder
syncer system.SyncStateProvider

mu sync.Mutex
signers map[types.NodeID]*signerSession
shared sharedSession
signers struct {
mu sync.Mutex
signers map[types.NodeID]*signerSession
}
shared sharedSession

fallback struct {
mu sync.Mutex
data map[types.EpochID][]types.ATXID
}
}

type signerSession struct {
Expand Down Expand Up @@ -251,7 +258,18 @@ func New(
tortoise: trtl,
syncer: syncer,
conState: conState,
signers: map[types.NodeID]*signerSession{},
signers: struct {
mu sync.Mutex
signers map[types.NodeID]*signerSession
}{
signers: map[types.NodeID]*signerSession{},
},
fallback: struct {
mu sync.Mutex
data map[types.EpochID][]types.ATXID
}{
data: map[types.EpochID][]types.ATXID{},
},
}
for _, opt := range opts {
opt(pb)
Expand All @@ -260,11 +278,11 @@ func New(
}

func (pb *ProposalBuilder) Register(signer *signing.EdSigner) {
pb.mu.Lock()
defer pb.mu.Unlock()
_, exist := pb.signers[signer.NodeID()]
pb.signers.mu.Lock()
defer pb.signers.mu.Unlock()
_, exist := pb.signers.signers[signer.NodeID()]
if !exist {
pb.signers[signer.NodeID()] = &signerSession{
pb.signers.signers[signer.NodeID()] = &signerSession{
signer: signer,
log: pb.logger.WithFields(log.String("signer", signer.NodeID().ShortString())),
}
Expand Down Expand Up @@ -368,6 +386,20 @@ func (pb *ProposalBuilder) decideMeshHash(ctx context.Context, current types.Lay
return mesh
}

func (pb *ProposalBuilder) UpdateActiveSet(epoch types.EpochID, activeSet []types.ATXID) {
pb.logger.With().Info("received activeset update",
epoch,
log.Int("size", len(activeSet)),
)
pb.fallback.mu.Lock()
defer pb.fallback.mu.Unlock()
if _, ok := pb.fallback.data[epoch]; ok {
pb.logger.With().Debug("fallback active set already exists", epoch)
return
}
pb.fallback.data[epoch] = activeSet
}

func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID) error {
if pb.shared.epoch != lid.GetEpoch() {
pb.shared = sharedSession{epoch: lid.GetEpoch()}
Expand All @@ -379,21 +411,36 @@ func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID
}
pb.shared.beacon = beacon
}
if pb.shared.active.set == nil {
weight, set, err := generateActiveSet(
pb.logger,
pb.cdb,
if pb.shared.active.set != nil {
return nil
}

if weight, set, err := pb.fallbackActiveSet(pb.shared.epoch); err == nil {
pb.logger.With().Info("using fallback active set",
pb.shared.epoch,
pb.clock.LayerToTime(pb.shared.epoch.FirstLayer()),
pb.cfg.goodAtxPercent,
pb.cfg.networkDelay,
log.Int("size", len(set)),
)
if err != nil {
return err
}
sort.Slice(set, func(i, j int) bool {
return bytes.Compare(set[i].Bytes(), set[j].Bytes()) < 0
})
pb.shared.active.set = set
pb.shared.active.weight = weight
return nil
}

weight, set, err := generateActiveSet(
pb.logger,
pb.cdb,
pb.shared.epoch,
pb.clock.LayerToTime(pb.shared.epoch.FirstLayer()),
pb.cfg.goodAtxPercent,
pb.cfg.networkDelay,
)
if err != nil {
return err
}
pb.shared.active.set = set
pb.shared.active.weight = weight
return nil
}

Expand Down Expand Up @@ -482,10 +529,10 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
return err
}

pb.mu.Lock()
pb.signers.mu.Lock()
// don't accept registration in the middle of computing proposals
signers := maps.Values(pb.signers)
pb.mu.Unlock()
signers := maps.Values(pb.signers.signers)
pb.signers.mu.Unlock()

var eg errgroup.Group
eg.SetLimit(pb.cfg.workersLimit)
Expand Down Expand Up @@ -710,6 +757,25 @@ func activesFromFirstBlock(
return totalWeight, set, nil
}

func (pb *ProposalBuilder) fallbackActiveSet(targetEpoch types.EpochID) (uint64, []types.ATXID, error) {
pb.fallback.mu.Lock()
defer pb.fallback.mu.Unlock()
set, ok := pb.fallback.data[targetEpoch]
if !ok {
return 0, nil, fmt.Errorf("no fallback active set for epoch %d", targetEpoch)
}

var totalWeight uint64
for _, id := range set {
atx, err := pb.cdb.GetAtxHeader(id)
if err != nil {
return 0, nil, err
}
totalWeight += atx.GetWeight()
}
return totalWeight, set, nil
}

func generateActiveSet(
logger log.Log,
cdb *datastore.CachedDB,
Expand Down
Loading

0 comments on commit 3675992

Please sign in to comment.