From c580fbac09987f86aa1f2d9ab8320e27d6188d04 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 21 Dec 2023 10:37:25 +0000 Subject: [PATCH] spawn a process to download atxs hinted by a server (#5377) related: https://github.com/spacemeshos/go-spacemesh/issues/5366 this change disables sync reconciliation at the end and start of epoch, normal atx downloading works the way it was working. instead it will ask peers for atxs that are recorded in the set, shared by the server. --- config/mainnet.go | 1 + node/node.go | 15 ++++ syncer/atxsync/atxsync.go | 75 ++++++++++++++++++++ syncer/atxsync/atxsync_test.go | 126 +++++++++++++++++++++++++++++++++ syncer/atxsync/mocks/mocks.go | 78 ++++++++++++++++++++ syncer/syncer.go | 6 +- 6 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 syncer/atxsync/atxsync.go create mode 100644 syncer/atxsync/atxsync_test.go create mode 100644 syncer/atxsync/mocks/mocks.go diff --git a/config/mainnet.go b/config/mainnet.go index 6c42334c60..bc21d26410 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -172,6 +172,7 @@ func MainnetConfig() Config { Standalone: false, GossipDuration: 50 * time.Second, OutOfSyncThresholdLayers: 36, // 3h + DisableAtxReconciliation: true, }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), diff --git a/node/node.go b/node/node.go index cf7ac874ae..86ead1b394 100644 --- a/node/node.go +++ b/node/node.go @@ -69,6 +69,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql/localsql" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" "github.com/spacemeshos/go-spacemesh/timesync" @@ -1154,6 +1155,20 @@ func (app *App) listenToUpdates(ctx context.Context) { } if len(update.Data.ActiveSet) > 0 { app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet) + set := update.Data.ActiveSet + app.eg.Go(func() error { + if err := atxsync.Download( + ctx, + 10*time.Second, + app.addLogger(SyncLogger, app.log).Zap(), + app.db, + app.fetcher, + set, + ); err != nil { + app.errCh <- err + } + return nil + }) } } } diff --git a/syncer/atxsync/atxsync.go b/syncer/atxsync/atxsync.go new file mode 100644 index 0000000000..c889157e6c --- /dev/null +++ b/syncer/atxsync/atxsync.go @@ -0,0 +1,75 @@ +package atxsync + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" +) + +//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go +type atxFetcher interface { + GetAtxs(context.Context, []types.ATXID) error +} + +func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) { + missing := []types.ATXID{} + for _, atx := range set { + exist, err := atxs.Has(db, atx) + if err != nil { + return nil, err + } + if !exist { + missing = append(missing, atx) + } + } + return missing, nil +} + +// Download specified set of atxs from peers in the network. +// +// actual retry interval will be between [retryInterval, 2*retryInterval]. +func Download( + ctx context.Context, + retryInterval time.Duration, + logger *zap.Logger, + db *sql.Database, + fetcher atxFetcher, + set []types.ATXID, +) error { + total := len(set) + for { + missing, err := getMissing(db, set) + if err != nil { + return err + } + set = missing + downloaded := total - len(missing) + logger.Info("downloaded atxs", + zap.Int("total", total), + zap.Int("downloaded", downloaded), + zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, atx := range missing { + enc.AppendString(atx.ShortString()) + } + return nil + }))) + if len(missing) == 0 { + return nil + } + if err := fetcher.GetAtxs(ctx, missing); err != nil { + logger.Debug("failed to fetch atxs", zap.Error(err)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))): + } + } + } +} diff --git a/syncer/atxsync/atxsync_test.go b/syncer/atxsync/atxsync_test.go new file mode 100644 index 0000000000..14c11ef0d6 --- /dev/null +++ b/syncer/atxsync/atxsync_test.go @@ -0,0 +1,126 @@ +package atxsync + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks" +) + +func atx(id types.ATXID) *types.VerifiedActivationTx { + atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{ + NIPostChallenge: types.NIPostChallenge{ + PublishEpoch: 1, + }, + NumUnits: 1, + }} + atx.SetID(id) + atx.SetEffectiveNumUnits(1) + atx.SetReceived(time.Now()) + copy(atx.SmesherID[:], id[:]) + vatx, err := atx.Verify(0, 1) + if err != nil { + panic(err) + } + return vatx +} + +func id(id ...byte) types.ATXID { + return types.BytesToATXID(id) +} + +type fetchRequest struct { + request []types.ATXID + result []*types.VerifiedActivationTx + error error +} + +func TestDownload(t *testing.T) { + canceled, cancel := context.WithCancel(context.Background()) + cancel() + for _, tc := range []struct { + desc string + ctx context.Context + retry time.Duration + existing []*types.VerifiedActivationTx + set []types.ATXID + fetched []fetchRequest + rst error + }{ + { + desc: "all existing", + ctx: context.Background(), + existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))}, + set: []types.ATXID{id(1), id(2), id(3)}, + }, + { + desc: "with multiple requests", + ctx: context.Background(), + existing: []*types.VerifiedActivationTx{atx(id(1))}, + retry: 1, + fetched: []fetchRequest{ + { + request: []types.ATXID{id(2), id(3)}, + error: errors.New("test"), + result: []*types.VerifiedActivationTx{atx(id(2))}, + }, + {request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}}, + }, + set: []types.ATXID{id(1), id(2), id(3)}, + }, + { + desc: "continue on error", + ctx: context.Background(), + retry: 1, + existing: []*types.VerifiedActivationTx{atx(id(1))}, + fetched: []fetchRequest{ + {request: []types.ATXID{id(2)}, error: errors.New("test")}, + {request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}}, + }, + set: []types.ATXID{id(1), id(2)}, + }, + { + desc: "exit on context", + ctx: canceled, + retry: 1, + existing: []*types.VerifiedActivationTx{atx(id(1))}, + fetched: []fetchRequest{ + {request: []types.ATXID{id(2)}, error: errors.New("test")}, + }, + set: []types.ATXID{id(1), id(2)}, + rst: context.Canceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + logger := logtest.New(t) + db := sql.InMemory() + ctrl := gomock.NewController(t) + fetcher := mocks.NewMockatxFetcher(ctrl) + for _, atx := range tc.existing { + require.NoError(t, atxs.Add(db, atx)) + } + for i := range tc.fetched { + req := tc.fetched[i] + fetcher.EXPECT(). + GetAtxs(tc.ctx, req.request). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.ATXID) error { + for _, atx := range req.result { + require.NoError(t, atxs.Add(db, atx)) + } + return req.error + }) + } + require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set)) + }) + } +} diff --git a/syncer/atxsync/mocks/mocks.go b/syncer/atxsync/mocks/mocks.go new file mode 100644 index 0000000000..a5e86d876c --- /dev/null +++ b/syncer/atxsync/mocks/mocks.go @@ -0,0 +1,78 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./atxsync.go +// +// Generated by this command: +// +// mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/spacemeshos/go-spacemesh/common/types" + gomock "go.uber.org/mock/gomock" +) + +// MockatxFetcher is a mock of atxFetcher interface. +type MockatxFetcher struct { + ctrl *gomock.Controller + recorder *MockatxFetcherMockRecorder +} + +// MockatxFetcherMockRecorder is the mock recorder for MockatxFetcher. +type MockatxFetcherMockRecorder struct { + mock *MockatxFetcher +} + +// NewMockatxFetcher creates a new mock instance. +func NewMockatxFetcher(ctrl *gomock.Controller) *MockatxFetcher { + mock := &MockatxFetcher{ctrl: ctrl} + mock.recorder = &MockatxFetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockatxFetcher) EXPECT() *MockatxFetcherMockRecorder { + return m.recorder +} + +// GetAtxs mocks base method. +func (m *MockatxFetcher) GetAtxs(arg0 context.Context, arg1 []types.ATXID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAtxs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetAtxs indicates an expected call of GetAtxs. +func (mr *MockatxFetcherMockRecorder) GetAtxs(arg0, arg1 any) *atxFetcherGetAtxsCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAtxs", reflect.TypeOf((*MockatxFetcher)(nil).GetAtxs), arg0, arg1) + return &atxFetcherGetAtxsCall{Call: call} +} + +// atxFetcherGetAtxsCall wrap *gomock.Call +type atxFetcherGetAtxsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *atxFetcherGetAtxsCall) Return(arg0 error) *atxFetcherGetAtxsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *atxFetcherGetAtxsCall) Do(f func(context.Context, []types.ATXID) error) *atxFetcherGetAtxsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *atxFetcherGetAtxsCall) DoAndReturn(f func(context.Context, []types.ATXID) error) *atxFetcherGetAtxsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 629665838a..840bdab45a 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -28,6 +28,7 @@ type Config struct { MaxStaleDuration time.Duration Standalone bool GossipDuration time.Duration + DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` } @@ -440,7 +441,10 @@ func (s *Syncer) syncAtx(ctx context.Context) error { return err } } - + if s.cfg.DisableAtxReconciliation { + s.logger.Debug("atx sync disabled") + return nil + } // steady state atx syncing curr := s.ticker.CurrentLayer() if float64(