From b8c398b82590c1021b7853d2d5a62d5c4f132fbd Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 21 Dec 2023 09:57:07 +0000 Subject: [PATCH] spawn a process to download atxs hinted by a server (#5377) related: https://github.com/spacemeshos/go-spacemesh/issues/5366 this change disables sync reconciliation at the end and start of epoch, normal atx downloading works the way it was working. instead it will ask peers for atxs that are recorded in the set, shared by the server. --- config/mainnet.go | 1 + node/node.go | 15 ++++ syncer/atxsync/atxsync.go | 75 ++++++++++++++++++++ syncer/atxsync/atxsync_test.go | 126 +++++++++++++++++++++++++++++++++ syncer/atxsync/mocks/mocks.go | 78 ++++++++++++++++++++ syncer/syncer.go | 6 +- 6 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 syncer/atxsync/atxsync.go create mode 100644 syncer/atxsync/atxsync_test.go create mode 100644 syncer/atxsync/mocks/mocks.go diff --git a/config/mainnet.go b/config/mainnet.go index 6c42334c608..bc21d264100 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -172,6 +172,7 @@ func MainnetConfig() Config { Standalone: false, GossipDuration: 50 * time.Second, OutOfSyncThresholdLayers: 36, // 3h + DisableAtxReconciliation: true, }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), diff --git a/node/node.go b/node/node.go index cf7ac874ae3..86ead1b394a 100644 --- a/node/node.go +++ b/node/node.go @@ -69,6 +69,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql/localsql" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" "github.com/spacemeshos/go-spacemesh/timesync" @@ -1154,6 +1155,20 @@ func (app *App) listenToUpdates(ctx context.Context) { } if len(update.Data.ActiveSet) > 0 { app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet) + set := update.Data.ActiveSet + app.eg.Go(func() error { + if err := atxsync.Download( + ctx, + 10*time.Second, + app.addLogger(SyncLogger, app.log).Zap(), + app.db, + app.fetcher, + set, + ); err != nil { + app.errCh <- err + } + return nil + }) } } } diff --git a/syncer/atxsync/atxsync.go b/syncer/atxsync/atxsync.go new file mode 100644 index 00000000000..c889157e6c1 --- /dev/null +++ b/syncer/atxsync/atxsync.go @@ -0,0 +1,75 @@ +package atxsync + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" +) + +//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go +type atxFetcher interface { + GetAtxs(context.Context, []types.ATXID) error +} + +func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) { + missing := []types.ATXID{} + for _, atx := range set { + exist, err := atxs.Has(db, atx) + if err != nil { + return nil, err + } + if !exist { + missing = append(missing, atx) + } + } + return missing, nil +} + +// Download specified set of atxs from peers in the network. +// +// actual retry interval will be between [retryInterval, 2*retryInterval]. +func Download( + ctx context.Context, + retryInterval time.Duration, + logger *zap.Logger, + db *sql.Database, + fetcher atxFetcher, + set []types.ATXID, +) error { + total := len(set) + for { + missing, err := getMissing(db, set) + if err != nil { + return err + } + set = missing + downloaded := total - len(missing) + logger.Info("downloaded atxs", + zap.Int("total", total), + zap.Int("downloaded", downloaded), + zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, atx := range missing { + enc.AppendString(atx.ShortString()) + } + return nil + }))) + if len(missing) == 0 { + return nil + } + if err := fetcher.GetAtxs(ctx, missing); err != nil { + logger.Debug("failed to fetch atxs", zap.Error(err)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))): + } + } + } +} diff --git a/syncer/atxsync/atxsync_test.go b/syncer/atxsync/atxsync_test.go new file mode 100644 index 00000000000..14c11ef0d68 --- /dev/null +++ b/syncer/atxsync/atxsync_test.go @@ -0,0 +1,126 @@ +package atxsync + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks" +) + +func atx(id types.ATXID) *types.VerifiedActivationTx { + atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{ + NIPostChallenge: types.NIPostChallenge{ + PublishEpoch: 1, + }, + NumUnits: 1, + }} + atx.SetID(id) + atx.SetEffectiveNumUnits(1) + atx.SetReceived(time.Now()) + copy(atx.SmesherID[:], id[:]) + vatx, err := atx.Verify(0, 1) + if err != nil { + panic(err) + } + return vatx +} + +func id(id ...byte) types.ATXID { + return types.BytesToATXID(id) +} + +type fetchRequest struct { + request []types.ATXID + result []*types.VerifiedActivationTx + error error +} + +func TestDownload(t *testing.T) { + canceled, cancel := context.WithCancel(context.Background()) + cancel() + for _, tc := range []struct { + desc string + ctx context.Context + retry time.Duration + existing []*types.VerifiedActivationTx + set []types.ATXID + fetched []fetchRequest + rst error + }{ + { + desc: "all existing", + ctx: context.Background(), + existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))}, + set: []types.ATXID{id(1), id(2), id(3)}, + }, + { + desc: "with multiple requests", + ctx: context.Background(), + existing: []*types.VerifiedActivationTx{atx(id(1))}, + retry: 1, + fetched: []fetchRequest{ + { + request: []types.ATXID{id(2), id(3)}, + error: errors.New("test"), + result: []*types.VerifiedActivationTx{atx(id(2))}, + }, + {request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}}, + }, + set: []types.ATXID{id(1), id(2), id(3)}, + }, + { + desc: "continue on error", + ctx: context.Background(), + retry: 1, + existing: []*types.VerifiedActivationTx{atx(id(1))}, + fetched: []fetchRequest{ + {request: []types.ATXID{id(2)}, error: errors.New("test")}, + {request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}}, + }, + set: []types.ATXID{id(1), id(2)}, + }, + { + desc: "exit on context", + ctx: canceled, + retry: 1, + existing: []*types.VerifiedActivationTx{atx(id(1))}, + fetched: []fetchRequest{ + {request: []types.ATXID{id(2)}, error: errors.New("test")}, + }, + set: []types.ATXID{id(1), id(2)}, + rst: context.Canceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + logger := logtest.New(t) + db := sql.InMemory() + ctrl := gomock.NewController(t) + fetcher := mocks.NewMockatxFetcher(ctrl) + for _, atx := range tc.existing { + require.NoError(t, atxs.Add(db, atx)) + } + for i := range tc.fetched { + req := tc.fetched[i] + fetcher.EXPECT(). + GetAtxs(tc.ctx, req.request). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.ATXID) error { + for _, atx := range req.result { + require.NoError(t, atxs.Add(db, atx)) + } + return req.error + }) + } + require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set)) + }) + } +} diff --git a/syncer/atxsync/mocks/mocks.go b/syncer/atxsync/mocks/mocks.go new file mode 100644 index 00000000000..a5e86d876cf --- /dev/null +++ b/syncer/atxsync/mocks/mocks.go @@ -0,0 +1,78 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./atxsync.go +// +// Generated by this command: +// +// mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/spacemeshos/go-spacemesh/common/types" + gomock "go.uber.org/mock/gomock" +) + +// MockatxFetcher is a mock of atxFetcher interface. +type MockatxFetcher struct { + ctrl *gomock.Controller + recorder *MockatxFetcherMockRecorder +} + +// MockatxFetcherMockRecorder is the mock recorder for MockatxFetcher. +type MockatxFetcherMockRecorder struct { + mock *MockatxFetcher +} + +// NewMockatxFetcher creates a new mock instance. +func NewMockatxFetcher(ctrl *gomock.Controller) *MockatxFetcher { + mock := &MockatxFetcher{ctrl: ctrl} + mock.recorder = &MockatxFetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockatxFetcher) EXPECT() *MockatxFetcherMockRecorder { + return m.recorder +} + +// GetAtxs mocks base method. +func (m *MockatxFetcher) GetAtxs(arg0 context.Context, arg1 []types.ATXID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAtxs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetAtxs indicates an expected call of GetAtxs. +func (mr *MockatxFetcherMockRecorder) GetAtxs(arg0, arg1 any) *atxFetcherGetAtxsCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAtxs", reflect.TypeOf((*MockatxFetcher)(nil).GetAtxs), arg0, arg1) + return &atxFetcherGetAtxsCall{Call: call} +} + +// atxFetcherGetAtxsCall wrap *gomock.Call +type atxFetcherGetAtxsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *atxFetcherGetAtxsCall) Return(arg0 error) *atxFetcherGetAtxsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *atxFetcherGetAtxsCall) Do(f func(context.Context, []types.ATXID) error) *atxFetcherGetAtxsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *atxFetcherGetAtxsCall) DoAndReturn(f func(context.Context, []types.ATXID) error) *atxFetcherGetAtxsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 629665838af..840bdab45a7 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -28,6 +28,7 @@ type Config struct { MaxStaleDuration time.Duration Standalone bool GossipDuration time.Duration + DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` } @@ -440,7 +441,10 @@ func (s *Syncer) syncAtx(ctx context.Context) error { return err } } - + if s.cfg.DisableAtxReconciliation { + s.logger.Debug("atx sync disabled") + return nil + } // steady state atx syncing curr := s.ticker.CurrentLayer() if float64(