From c0c3aff21a0376ceb52c8880c32b81d6d208ef95 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 21 Dec 2023 10:37:25 +0000 Subject: [PATCH] spawn a process to download atxs hinted by a server (#5377) related: https://github.com/spacemeshos/go-spacemesh/issues/5366 this change disables sync reconciliation at the end and start of epoch, normal atx downloading works the way it was working. instead it will ask peers for atxs that are recorded in the set, shared by the server. --- config/mainnet.go | 1 + node/node.go | 15 ++++ syncer/atxsync/atxsync.go | 75 ++++++++++++++++++++ syncer/atxsync/atxsync_test.go | 126 +++++++++++++++++++++++++++++++++ syncer/atxsync/mocks/mocks.go | 78 ++++++++++++++++++++ syncer/syncer.go | 6 +- 6 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 syncer/atxsync/atxsync.go create mode 100644 syncer/atxsync/atxsync_test.go create mode 100644 syncer/atxsync/mocks/mocks.go diff --git a/config/mainnet.go b/config/mainnet.go index 587c99548e..337fdc1cdc 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -167,6 +167,7 @@ func MainnetConfig() Config { Standalone: false, GossipDuration: 50 * time.Second, OutOfSyncThresholdLayers: 36, // 3h + DisableAtxReconciliation: true, }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), diff --git a/node/node.go b/node/node.go index 34b07e3a80..a90b1e9a17 100644 --- a/node/node.go +++ b/node/node.go @@ -70,6 +70,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" "github.com/spacemeshos/go-spacemesh/timesync" @@ -1102,6 +1103,20 @@ func (app *App) listenToUpdates(ctx context.Context) { } if len(update.Data.ActiveSet) > 0 { app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet) + set := update.Data.ActiveSet + app.eg.Go(func() error { + if err := atxsync.Download( + ctx, + 10*time.Second, + app.addLogger(SyncLogger, app.log).Zap(), + app.db, + app.fetcher, + set, + ); err != nil { + app.errCh <- err + } + return nil + }) } } } diff --git a/syncer/atxsync/atxsync.go b/syncer/atxsync/atxsync.go new file mode 100644 index 0000000000..c889157e6c --- /dev/null +++ b/syncer/atxsync/atxsync.go @@ -0,0 +1,75 @@ +package atxsync + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" +) + +//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go +type atxFetcher interface { + GetAtxs(context.Context, []types.ATXID) error +} + +func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) { + missing := []types.ATXID{} + for _, atx := range set { + exist, err := atxs.Has(db, atx) + if err != nil { + return nil, err + } + if !exist { + missing = append(missing, atx) + } + } + return missing, nil +} + +// Download specified set of atxs from peers in the network. +// +// actual retry interval will be between [retryInterval, 2*retryInterval]. +func Download( + ctx context.Context, + retryInterval time.Duration, + logger *zap.Logger, + db *sql.Database, + fetcher atxFetcher, + set []types.ATXID, +) error { + total := len(set) + for { + missing, err := getMissing(db, set) + if err != nil { + return err + } + set = missing + downloaded := total - len(missing) + logger.Info("downloaded atxs", + zap.Int("total", total), + zap.Int("downloaded", downloaded), + zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, atx := range missing { + enc.AppendString(atx.ShortString()) + } + return nil + }))) + if len(missing) == 0 { + return nil + } + if err := fetcher.GetAtxs(ctx, missing); err != nil { + logger.Debug("failed to fetch atxs", zap.Error(err)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))): + } + } + } +} diff --git a/syncer/atxsync/atxsync_test.go b/syncer/atxsync/atxsync_test.go new file mode 100644 index 0000000000..14c11ef0d6 --- /dev/null +++ b/syncer/atxsync/atxsync_test.go @@ -0,0 +1,126 @@ +package atxsync + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks" +) + +func atx(id types.ATXID) *types.VerifiedActivationTx { + atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{ + NIPostChallenge: types.NIPostChallenge{ + PublishEpoch: 1, + }, + NumUnits: 1, + }} + atx.SetID(id) + atx.SetEffectiveNumUnits(1) + atx.SetReceived(time.Now()) + copy(atx.SmesherID[:], id[:]) + vatx, err := atx.Verify(0, 1) + if err != nil { + panic(err) + } + return vatx +} + +func id(id ...byte) types.ATXID { + return types.BytesToATXID(id) +} + +type fetchRequest struct { + request []types.ATXID + result []*types.VerifiedActivationTx + error error +} + +func TestDownload(t *testing.T) { + canceled, cancel := context.WithCancel(context.Background()) + cancel() + for _, tc := range []struct { + desc string + ctx context.Context + retry time.Duration + existing []*types.VerifiedActivationTx + set []types.ATXID + fetched []fetchRequest + rst error + }{ + { + desc: "all existing", + ctx: context.Background(), + existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))}, + set: []types.ATXID{id(1), id(2), id(3)}, + }, + { + desc: "with multiple requests", + ctx: context.Background(), + existing: []*types.VerifiedActivationTx{atx(id(1))}, + retry: 1, + fetched: []fetchRequest{ + { + request: []types.ATXID{id(2), id(3)}, + error: errors.New("test"), + result: []*types.VerifiedActivationTx{atx(id(2))}, + }, + {request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}}, + }, + set: []types.ATXID{id(1), id(2), id(3)}, + }, + { + desc: "continue on error", + ctx: context.Background(), + retry: 1, + existing: []*types.VerifiedActivationTx{atx(id(1))}, + fetched: []fetchRequest{ + {request: []types.ATXID{id(2)}, error: errors.New("test")}, + {request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}}, + }, + set: []types.ATXID{id(1), id(2)}, + }, + { + desc: "exit on context", + ctx: canceled, + retry: 1, + existing: []*types.VerifiedActivationTx{atx(id(1))}, + fetched: []fetchRequest{ + {request: []types.ATXID{id(2)}, error: errors.New("test")}, + }, + set: []types.ATXID{id(1), id(2)}, + rst: context.Canceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + logger := logtest.New(t) + db := sql.InMemory() + ctrl := gomock.NewController(t) + fetcher := mocks.NewMockatxFetcher(ctrl) + for _, atx := range tc.existing { + require.NoError(t, atxs.Add(db, atx)) + } + for i := range tc.fetched { + req := tc.fetched[i] + fetcher.EXPECT(). + GetAtxs(tc.ctx, req.request). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.ATXID) error { + for _, atx := range req.result { + require.NoError(t, atxs.Add(db, atx)) + } + return req.error + }) + } + require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set)) + }) + } +} diff --git a/syncer/atxsync/mocks/mocks.go b/syncer/atxsync/mocks/mocks.go new file mode 100644 index 0000000000..a5e86d876c --- /dev/null +++ b/syncer/atxsync/mocks/mocks.go @@ -0,0 +1,78 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./atxsync.go +// +// Generated by this command: +// +// mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/spacemeshos/go-spacemesh/common/types" + gomock "go.uber.org/mock/gomock" +) + +// MockatxFetcher is a mock of atxFetcher interface. +type MockatxFetcher struct { + ctrl *gomock.Controller + recorder *MockatxFetcherMockRecorder +} + +// MockatxFetcherMockRecorder is the mock recorder for MockatxFetcher. +type MockatxFetcherMockRecorder struct { + mock *MockatxFetcher +} + +// NewMockatxFetcher creates a new mock instance. +func NewMockatxFetcher(ctrl *gomock.Controller) *MockatxFetcher { + mock := &MockatxFetcher{ctrl: ctrl} + mock.recorder = &MockatxFetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockatxFetcher) EXPECT() *MockatxFetcherMockRecorder { + return m.recorder +} + +// GetAtxs mocks base method. +func (m *MockatxFetcher) GetAtxs(arg0 context.Context, arg1 []types.ATXID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAtxs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetAtxs indicates an expected call of GetAtxs. +func (mr *MockatxFetcherMockRecorder) GetAtxs(arg0, arg1 any) *atxFetcherGetAtxsCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAtxs", reflect.TypeOf((*MockatxFetcher)(nil).GetAtxs), arg0, arg1) + return &atxFetcherGetAtxsCall{Call: call} +} + +// atxFetcherGetAtxsCall wrap *gomock.Call +type atxFetcherGetAtxsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *atxFetcherGetAtxsCall) Return(arg0 error) *atxFetcherGetAtxsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *atxFetcherGetAtxsCall) Do(f func(context.Context, []types.ATXID) error) *atxFetcherGetAtxsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *atxFetcherGetAtxsCall) DoAndReturn(f func(context.Context, []types.ATXID) error) *atxFetcherGetAtxsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 3813624145..6a6673ab47 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -28,6 +28,7 @@ type Config struct { MaxStaleDuration time.Duration Standalone bool GossipDuration time.Duration + DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` } @@ -442,7 +443,10 @@ func (s *Syncer) syncAtx(ctx context.Context) error { return err } } - + if s.cfg.DisableAtxReconciliation { + s.logger.Debug("atx sync disabled") + return nil + } // steady state atx syncing curr := s.ticker.CurrentLayer() if float64(