diff --git a/CHANGELOG.md b/CHANGELOG.md index 25aef13914..798f783607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,31 @@ for more information on how to configure the node to work with the PoST service. * [#5367](https://github.com/spacemeshos/go-spacemesh/pull/5367) Add `no-main-override` toplevel config option and `--no-main-override` CLI option that makes it possible to run "nomain" builds on mainnet. +* [#5384](https://github.com/spacemeshos/go-spacemesh/pull/5384) to improve network stability and performance allow the + active set to be set in advance for an epoch. This allows the network to start consensus on the first layer of an epoch. + +## Release v1.2.12 + +### Improvements + +* [#5373](https://github.com/spacemeshos/go-spacemesh/pull/5373) automatic scaling of post verifying workers to a lower + value (1 by default) when POST proving starts. The workers are scaled up when POST proving finishes. + +* [#5382](https://github.com/spacemeshos/go-spacemesh/pull/5382) avoid processing same (gossiped/fetched) ATX many times + in parallel + +## Release v1.2.11 + +### Improvements + +* increased the max response data size in p2p to 40MiB + +## Release v1.2.10 + +### Improvements + +* further increased cache sizes and and p2p timeouts to compensate for the increased number of nodes on the network. + ## Release v1.2.9 ### Improvements diff --git a/bootstrap/updater.go b/bootstrap/updater.go index 42c5f31666..a8f030f526 100644 --- a/bootstrap/updater.go +++ b/bootstrap/updater.go @@ -41,7 +41,7 @@ const ( suffixLen = 2 SuffixBeacon = "bc" SuffixActiveSet = "as" - SuffixBoostrap = "bs" + SuffixBootstrap = "bs" httpTimeout = 5 * time.Second notifyTimeout = time.Second @@ -210,7 +210,7 @@ func (u *Updater) addUpdate(epoch types.EpochID, suffix string) { u.mu.Lock() defer u.mu.Unlock() switch suffix { - case SuffixActiveSet, SuffixBeacon, SuffixBoostrap: + case SuffixActiveSet, SuffixBeacon, SuffixBootstrap: default: return } @@ -242,7 +242,7 @@ func (u *Updater) DoIt(ctx context.Context) error { } }() for _, epoch := range requiredEpochs(current) { - verified, cached, err := u.checkEpochUpdate(ctx, epoch, SuffixBoostrap) + verified, cached, err := u.checkEpochUpdate(ctx, epoch, SuffixBootstrap) if err != nil { return err } @@ -324,8 +324,6 @@ func (u *Updater) get(ctx context.Context, uri string) (*VerifiedUpdate, []byte, return nil, nil, fmt.Errorf("scheme not supported %v", resource.Scheme) } - ctx, cancel := context.WithTimeout(ctx, httpTimeout) - defer cancel() t0 := time.Now() data, err := query(ctx, u.client, resource) if err != nil { diff --git a/bootstrap/updater_test.go b/bootstrap/updater_test.go index 54eab3c22e..ee66378972 100644 --- a/bootstrap/updater_test.go +++ b/bootstrap/updater_test.go @@ -163,7 +163,7 @@ func TestLoad(t *testing.T) { { desc: "recovery required", persisted: map[types.EpochID][]string{ - current - 2: {bootstrap.SuffixBoostrap, update1}, + current - 2: {bootstrap.SuffixBootstrap, update1}, current - 1: {bootstrap.SuffixActiveSet, update2}, current: {bootstrap.SuffixBeacon, update3}, current + 1: {bootstrap.SuffixActiveSet, update4}, @@ -234,7 +234,7 @@ func TestLoadedNotDownloadedAgain(t *testing.T) { cfg.DataDir, bootstrap.DirName, strconv.Itoa(int(epoch)), - bootstrap.UpdateName(epoch, bootstrap.SuffixBoostrap), + bootstrap.UpdateName(epoch, bootstrap.SuffixBootstrap), ) require.NoError(t, fs.MkdirAll(filepath.Dir(persisted), 0o700)) require.NoError(t, afero.WriteFile(fs, persisted, []byte(update), 0o400)) @@ -335,7 +335,7 @@ func TestDoIt(t *testing.T) { { desc: "in order", updates: map[string]string{ - "/" + bootstrap.UpdateName(1, bootstrap.SuffixBoostrap): update1, + "/" + bootstrap.UpdateName(1, bootstrap.SuffixBootstrap): update1, "/" + bootstrap.UpdateName(2, bootstrap.SuffixActiveSet): update2, "/" + bootstrap.UpdateName(3, bootstrap.SuffixBeacon): update3, "/" + bootstrap.UpdateName(4, bootstrap.SuffixActiveSet): update4, @@ -346,7 +346,7 @@ func TestDoIt(t *testing.T) { { desc: "bootstrap trumps others", updates: map[string]string{ - "/" + bootstrap.UpdateName(3, bootstrap.SuffixBoostrap): update1, + "/" + bootstrap.UpdateName(3, bootstrap.SuffixBootstrap): update1, "/" + bootstrap.UpdateName(3, bootstrap.SuffixActiveSet): update2, "/" + bootstrap.UpdateName(3, bootstrap.SuffixBeacon): update3, "/" + bootstrap.UpdateName(4, bootstrap.SuffixActiveSet): update4, @@ -507,7 +507,7 @@ func TestNoNewUpdate(t *testing.T) { numQ := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodGet, r.Method) - if r.URL.String() != "/"+bootstrap.UpdateName(3, bootstrap.SuffixBoostrap) { + if r.URL.String() != "/"+bootstrap.UpdateName(3, bootstrap.SuffixBootstrap) { w.WriteHeader(http.StatusNotFound) return } diff --git a/cmd/bootstrapper/bootstrapper.go b/cmd/bootstrapper/bootstrapper.go index 744c42b0df..42e9c8338f 100644 --- a/cmd/bootstrapper/bootstrapper.go +++ b/cmd/bootstrapper/bootstrapper.go @@ -76,7 +76,7 @@ var cmd = &cobra.Command{ Short: "generate bootstrapping data", RunE: func(cmd *cobra.Command, args []string) error { if len(args) == 0 { - return fmt.Errorf("epoch not specfiied") + return fmt.Errorf("epoch not specified") } var targetEpochs []types.EpochID epochs := strings.Split(args[0], ",") diff --git a/cmd/bootstrapper/generator.go b/cmd/bootstrapper/generator.go index 89f871e720..87bb77e77e 100644 --- a/cmd/bootstrapper/generator.go +++ b/cmd/bootstrapper/generator.go @@ -95,7 +95,7 @@ func (g *Generator) Generate( err error ) if genBeacon && genActiveSet { - suffix = bootstrap.SuffixBoostrap + suffix = bootstrap.SuffixBootstrap } else if genBeacon { suffix = bootstrap.SuffixBeacon } else if genActiveSet { @@ -181,7 +181,7 @@ func queryBitcoin(ctx context.Context, client *http.Client, targetUrl string) (* defer resp.Body.Close() data, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("bootstrap read resonse: %w", err) + return nil, fmt.Errorf("bootstrap read response: %w", err) } var br BitcoinResponse err = json.Unmarshal(data, &br) @@ -205,7 +205,7 @@ func getActiveSet(ctx context.Context, endpoint string, epoch types.EpochID) ([] if err != nil { return nil, fmt.Errorf("epoch stream %v: %w", endpoint, err) } - activeSet := make([]types.ATXID, 0, 10_000) + activeSet := make([]types.ATXID, 0, 300_000) for { resp, err := stream.Recv() if errors.Is(err, io.EOF) { @@ -227,7 +227,7 @@ func (g *Generator) GenUpdate( ) (string, error) { as := make([]string, 0, len(activeSet)) for _, atx := range activeSet { - as = append(as, hex.EncodeToString(atx.Hash32().Bytes())) // no leading 0x + as = append(as, hex.EncodeToString(atx.Bytes())) // no leading 0x } var update bootstrap.Update update.Version = SchemaVersion @@ -243,11 +243,11 @@ func (g *Generator) GenUpdate( } data, err := json.Marshal(update) if err != nil { - return "", fmt.Errorf("marshal data %v: %w", string(data), err) + return "", fmt.Errorf("marshal data: %w", err) } // make sure the data is valid if err = bootstrap.ValidateSchema(data); err != nil { - return "", fmt.Errorf("invalid data %v: %w", string(data), err) + return "", fmt.Errorf("invalid data: %w", err) } filename := PersistedFilename(epoch, suffix) err = afero.WriteFile(g.fs, filename, data, 0o600) @@ -255,7 +255,6 @@ func (g *Generator) GenUpdate( return "", fmt.Errorf("persist epoch update %v: %w", filename, err) } g.logger.With().Info("generated update", - log.String("update", string(data)), log.String("filename", filename), ) return filename, nil diff --git a/cmd/bootstrapper/generator_test.go b/cmd/bootstrapper/generator_test.go index b15a5526c3..c357e14001 100644 --- a/cmd/bootstrapper/generator_test.go +++ b/cmd/bootstrapper/generator_test.go @@ -91,14 +91,16 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) (grpcserver.Config, fu } } -func verifyUpdate(t *testing.T, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) { - require.NoError(t, bootstrap.ValidateSchema(data)) +func verifyUpdate(tb testing.TB, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) { + tb.Helper() + require.NoError(tb, bootstrap.ValidateSchema(data)) + var update bootstrap.Update - require.NoError(t, json.Unmarshal(data, &update)) - require.Equal(t, SchemaVersion, update.Version) - require.EqualValues(t, epoch, update.Data.Epoch.ID) - require.Equal(t, expBeacon, update.Data.Epoch.Beacon) - require.Len(t, update.Data.Epoch.ActiveSet, expAsSize) + require.NoError(tb, json.Unmarshal(data, &update)) + require.Equal(tb, SchemaVersion, update.Version) + require.Equal(tb, epoch.Uint32(), update.Data.Epoch.ID) + require.Equal(tb, expBeacon, update.Data.Epoch.Beacon) + require.Len(tb, update.Data.Epoch.ActiveSet, expAsSize) } func TestGenerator_Generate(t *testing.T) { diff --git a/cmd/bootstrapper/server.go b/cmd/bootstrapper/server.go index 52320d98f1..bcc0da5416 100644 --- a/cmd/bootstrapper/server.go +++ b/cmd/bootstrapper/server.go @@ -150,7 +150,7 @@ func (s *Server) GenBootstrap(ctx context.Context, epoch types.EpochID) error { if err != nil { return err } - suffix := bootstrap.SuffixBoostrap + suffix := bootstrap.SuffixBootstrap _, err = s.gen.GenUpdate(epoch, epochBeacon(epoch), actives, suffix) return err } diff --git a/cmd/bootstrapper/server_test.go b/cmd/bootstrapper/server_test.go index ffa633aa09..73c908b51b 100644 --- a/cmd/bootstrapper/server_test.go +++ b/cmd/bootstrapper/server_test.go @@ -87,14 +87,14 @@ func TestServer(t *testing.T) { for _, epoch := range epochs { createAtxs(t, db, epoch-1, types.RandomActiveSet(activeSetSize)) - fname := PersistedFilename(epoch, bootstrap.SuffixBoostrap) + fname := PersistedFilename(epoch, bootstrap.SuffixBootstrap) require.Eventually(t, func() bool { _, err := fs.Stat(fname) return err == nil }, 5*time.Second, 100*time.Millisecond) require.Empty(t, ch) - data := query(t, ctx, bootstrap.UpdateName(epoch, bootstrap.SuffixBoostrap)) + data := query(t, ctx, bootstrap.UpdateName(epoch, bootstrap.SuffixBootstrap)) verifyUpdate(t, data, epoch, hex.EncodeToString(epochBeacon(epoch).Bytes()), activeSetSize) require.NoError(t, fs.Remove(fname)) } diff --git a/miner/proposal_builder.go b/miner/proposal_builder.go index c48ec799cf..16921d99fa 100644 --- a/miner/proposal_builder.go +++ b/miner/proposal_builder.go @@ -72,9 +72,16 @@ type ProposalBuilder struct { tortoise votesEncoder syncer system.SyncStateProvider - mu sync.Mutex - signers map[types.NodeID]*signerSession - shared sharedSession + signers struct { + mu sync.Mutex + signers map[types.NodeID]*signerSession + } + shared sharedSession + + fallback struct { + mu sync.Mutex + data map[types.EpochID][]types.ATXID + } } type signerSession struct { @@ -251,7 +258,18 @@ func New( tortoise: trtl, syncer: syncer, conState: conState, - signers: map[types.NodeID]*signerSession{}, + signers: struct { + mu sync.Mutex + signers map[types.NodeID]*signerSession + }{ + signers: map[types.NodeID]*signerSession{}, + }, + fallback: struct { + mu sync.Mutex + data map[types.EpochID][]types.ATXID + }{ + data: map[types.EpochID][]types.ATXID{}, + }, } for _, opt := range opts { opt(pb) @@ -260,11 +278,11 @@ func New( } func (pb *ProposalBuilder) Register(signer *signing.EdSigner) { - pb.mu.Lock() - defer pb.mu.Unlock() - _, exist := pb.signers[signer.NodeID()] + pb.signers.mu.Lock() + defer pb.signers.mu.Unlock() + _, exist := pb.signers.signers[signer.NodeID()] if !exist { - pb.signers[signer.NodeID()] = &signerSession{ + pb.signers.signers[signer.NodeID()] = &signerSession{ signer: signer, log: pb.logger.WithFields(log.String("signer", signer.NodeID().ShortString())), } @@ -368,6 +386,20 @@ func (pb *ProposalBuilder) decideMeshHash(ctx context.Context, current types.Lay return mesh } +func (pb *ProposalBuilder) UpdateActiveSet(epoch types.EpochID, activeSet []types.ATXID) { + pb.logger.With().Info("received activeset update", + epoch, + log.Int("size", len(activeSet)), + ) + pb.fallback.mu.Lock() + defer pb.fallback.mu.Unlock() + if _, ok := pb.fallback.data[epoch]; ok { + pb.logger.With().Debug("fallback active set already exists", epoch) + return + } + pb.fallback.data[epoch] = activeSet +} + func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID) error { if pb.shared.epoch != lid.GetEpoch() { pb.shared = sharedSession{epoch: lid.GetEpoch()} @@ -379,21 +411,36 @@ func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID } pb.shared.beacon = beacon } - if pb.shared.active.set == nil { - weight, set, err := generateActiveSet( - pb.logger, - pb.cdb, + if pb.shared.active.set != nil { + return nil + } + + if weight, set, err := pb.fallbackActiveSet(pb.shared.epoch); err == nil { + pb.logger.With().Info("using fallback active set", pb.shared.epoch, - pb.clock.LayerToTime(pb.shared.epoch.FirstLayer()), - pb.cfg.goodAtxPercent, - pb.cfg.networkDelay, + log.Int("size", len(set)), ) - if err != nil { - return err - } + sort.Slice(set, func(i, j int) bool { + return bytes.Compare(set[i].Bytes(), set[j].Bytes()) < 0 + }) pb.shared.active.set = set pb.shared.active.weight = weight + return nil } + + weight, set, err := generateActiveSet( + pb.logger, + pb.cdb, + pb.shared.epoch, + pb.clock.LayerToTime(pb.shared.epoch.FirstLayer()), + pb.cfg.goodAtxPercent, + pb.cfg.networkDelay, + ) + if err != nil { + return err + } + pb.shared.active.set = set + pb.shared.active.weight = weight return nil } @@ -482,10 +529,10 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error { return err } - pb.mu.Lock() + pb.signers.mu.Lock() // don't accept registration in the middle of computing proposals - signers := maps.Values(pb.signers) - pb.mu.Unlock() + signers := maps.Values(pb.signers.signers) + pb.signers.mu.Unlock() var eg errgroup.Group eg.SetLimit(pb.cfg.workersLimit) @@ -710,6 +757,25 @@ func activesFromFirstBlock( return totalWeight, set, nil } +func (pb *ProposalBuilder) fallbackActiveSet(targetEpoch types.EpochID) (uint64, []types.ATXID, error) { + pb.fallback.mu.Lock() + defer pb.fallback.mu.Unlock() + set, ok := pb.fallback.data[targetEpoch] + if !ok { + return 0, nil, fmt.Errorf("no fallback active set for epoch %d", targetEpoch) + } + + var totalWeight uint64 + for _, id := range set { + atx, err := pb.cdb.GetAtxHeader(id) + if err != nil { + return 0, nil, err + } + totalWeight += atx.GetWeight() + } + return totalWeight, set, nil +} + func generateActiveSet( logger log.Log, cdb *datastore.CachedDB, diff --git a/miner/proposal_builder_test.go b/miner/proposal_builder_test.go index 0047f1f006..b792358430 100644 --- a/miner/proposal_builder_test.go +++ b/miner/proposal_builder_test.go @@ -204,15 +204,20 @@ type aggHash struct { } type step struct { - lid types.LayerID - beacon types.Beacon - atxs []*types.VerifiedActivationTx - ballots []*types.Ballot - activeset types.ATXIDList - identitities []identity - blocks []*types.Block - hare []types.LayerID - aggHashes []aggHash + lid types.LayerID + beacon types.Beacon + atxs []*types.VerifiedActivationTx + ballots []*types.Ballot + activeset types.ATXIDList + identities []identity + blocks []*types.Block + hare []types.LayerID + aggHashes []aggHash + + fallbackActiveSets []struct { + epoch types.EpochID + atxs types.ATXIDList + } txs []types.TransactionID latestComplete types.LayerID @@ -271,6 +276,42 @@ func TestBuild(t *testing.T) { }, }, }, + { + desc: "activeset fallback with all ATXs from fallback available", + steps: []step{ + { + lid: 15, + beacon: types.Beacon{1}, + atxs: []*types.VerifiedActivationTx{ + gatx(types.ATXID{1}, 2, signer.NodeID(), 1, genAtxWithNonce(777)), + gatx(types.ATXID{2}, 2, types.NodeID{2}, 1), + gatx(types.ATXID{3}, 2, types.NodeID{3}, 1), + gatx(types.ATXID{4}, 2, types.NodeID{4}, 1), + gatx(types.ATXID{5}, 2, types.NodeID{5}, 1), + gatx(types.ATXID{6}, 2, types.NodeID{6}, 1), + }, + fallbackActiveSets: []struct { + epoch types.EpochID + atxs types.ATXIDList + }{ + {3, types.ATXIDList{{1}, {2}, {3}, {4}}}, + }, + opinion: &types.Opinion{Hash: types.Hash32{1}}, + txs: []types.TransactionID{{1}, {2}}, + latestComplete: 14, + expectProposal: expectProposal( + signer, 15, types.ATXID{1}, types.Opinion{Hash: types.Hash32{1}}, + expectEpochData( + gactiveset(types.ATXID{1}, types.ATXID{2}, types.ATXID{3}, types.ATXID{4}), + 12, + types.Beacon{1}, + ), + expectTxs([]types.TransactionID{{1}, {2}}), + expectCounters(signer, 3, types.Beacon{1}, 777, 0, 6, 9), + ), + }, + }, + }, { desc: "min active weight", opts: []Opt{WithMinimalActiveSetWeight([]types.EpochMinimalActiveWeight{{Weight: 1000}})}, @@ -460,7 +501,7 @@ func TestBuild(t *testing.T) { gatx(types.ATXID{1}, 2, signer.NodeID(), 1, genAtxWithNonce(777)), gatx(types.ATXID{2}, 2, types.NodeID{2}, 1), }, - identitities: []identity{{ + identities: []identity{{ id: types.NodeID{2}, proof: types.MalfeasanceProof{Proof: types.Proof{ Type: types.HareEquivocation, @@ -716,7 +757,7 @@ func TestBuild(t *testing.T) { if step.beacon != types.EmptyBeacon { require.NoError(t, beacons.Add(cdb, step.lid.GetEpoch(), step.beacon)) } - for _, iden := range step.identitities { + for _, iden := range step.identities { require.NoError( t, identities.SetMalicious( @@ -754,6 +795,9 @@ func TestBuild(t *testing.T) { ), ) } + for _, activeSet := range step.fallbackActiveSets { + builder.UpdateActiveSet(activeSet.epoch, activeSet.atxs) + } } { if step.opinion != nil { diff --git a/node/node.go b/node/node.go index 86ead1b394..59ffe1b23c 100644 --- a/node/node.go +++ b/node/node.go @@ -2,6 +2,7 @@ package node import ( + "bytes" "context" "encoding/hex" "errors" @@ -12,6 +13,7 @@ import ( "os/signal" "path/filepath" "runtime" + "sort" "syscall" "time" @@ -64,6 +66,7 @@ import ( "github.com/spacemeshos/go-spacemesh/prune" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/activesets" "github.com/spacemeshos/go-spacemesh/sql/ballots/util" "github.com/spacemeshos/go-spacemesh/sql/layers" "github.com/spacemeshos/go-spacemesh/sql/localsql" @@ -1142,11 +1145,14 @@ func (app *App) listenToUpdates(ctx context.Context) { app.errCh <- err return nil } - for update := range ch { + for { select { case <-ctx.Done(): return nil - default: + case update, ok := <-ch: + if !ok { + return nil + } if update.Data.Beacon != types.EmptyBeacon { if err := app.beaconProtocol.UpdateBeacon(update.Data.Epoch, update.Data.Beacon); err != nil { app.errCh <- err @@ -1154,8 +1160,21 @@ func (app *App) listenToUpdates(ctx context.Context) { } } if len(update.Data.ActiveSet) > 0 { - app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet) + epoch := update.Data.Epoch set := update.Data.ActiveSet + sort.Slice(set, func(i, j int) bool { + return bytes.Compare(set[i].Bytes(), set[j].Bytes()) < 0 + }) + id := types.ATXIDList(set).Hash() + activeSet := &types.EpochActiveSet{ + Epoch: epoch, + Set: set, + } + activesets.Add(app.db, id, activeSet) + + app.hOracle.UpdateActiveSet(epoch, set) + app.proposalBuilder.UpdateActiveSet(epoch, set) + app.eg.Go(func() error { if err := atxsync.Download( ctx, @@ -1172,7 +1191,6 @@ func (app *App) listenToUpdates(ctx context.Context) { } } } - return nil }) }