Skip to content

Commit

Permalink
Backport of #5377 for v1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Dec 20, 2023
1 parent 0e55663 commit dc977c9
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 0 deletions.
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func MainnetConfig() Config {
Standalone: false,
GossipDuration: 50 * time.Second,
OutOfSyncThresholdLayers: 36, // 3h
DisableAtxSync: true,
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
12 changes: 12 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
Expand Down Expand Up @@ -1102,6 +1103,17 @@ func (app *App) listenToUpdates(ctx context.Context) {
}
if len(update.Data.ActiveSet) > 0 {
app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)

Check failure on line 1106 in node/node.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/spacemeshos/go-spacemesh) (gci)
app.eg.Go(func() error {
return atxsync.Download(
ctx,
10*time.Second,
app.addLogger(SyncLogger, app.log).Zap(),
app.db,
app.fetcher,
update.Data.ActiveSet,

Check failure on line 1114 in node/node.go

View workflow job for this annotation

GitHub Actions / lint

loopclosure: loop variable update captured by func literal (govet)
)
})

Check warning on line 1116 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1106-L1116

Added lines #L1106 - L1116 were not covered by tests
}
}
}
Expand Down
75 changes: 75 additions & 0 deletions syncer/atxsync/atxsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package atxsync

import (
"context"
"math/rand"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go
type atxFetcher interface {
GetAtxs(context.Context, []types.ATXID) error
}

func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) {
missing := []types.ATXID{}
for _, atx := range set {
exist, err := atxs.Has(db, atx)
if err != nil {
return nil, err
}

Check warning on line 27 in syncer/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

syncer/atxsync/atxsync.go#L26-L27

Added lines #L26 - L27 were not covered by tests
if !exist {
missing = append(missing, atx)
}
}
return missing, nil
}

// Download specified set of atxs from peers in the network.
//
// actual retry interval will be between [retryInterval, 2*retryInterval]

Check failure on line 37 in syncer/atxsync/atxsync.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
func Download(
ctx context.Context,
retryInterval time.Duration,
logger *zap.Logger,
db *sql.Database,
fetcher atxFetcher,
set []types.ATXID,
) error {
total := len(set)
for {
missing, err := getMissing(db, set)
if err != nil {
return err
}

Check warning on line 51 in syncer/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

syncer/atxsync/atxsync.go#L50-L51

Added lines #L50 - L51 were not covered by tests
set = missing
downloaded := total - len(missing)
logger.Info("downloaded atxs",
zap.Int("total", total),
zap.Int("downloaded", downloaded),
zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, atx := range missing {
enc.AppendString(atx.ShortString())
}
return nil

Check warning on line 61 in syncer/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

syncer/atxsync/atxsync.go#L58-L61

Added lines #L58 - L61 were not covered by tests
})))
if downloaded == total {
return nil
}
if err := fetcher.GetAtxs(ctx, missing); err != nil {
logger.Debug("failed to fetch atxs", zap.Error(err))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))):
}
}
}
}
122 changes: 122 additions & 0 deletions syncer/atxsync/atxsync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package atxsync

import (
"context"
"errors"
"testing"
"time"

Check failure on line 8 in syncer/atxsync/atxsync_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/spacemeshos/go-spacemesh) (gci)
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks"
"github.com/stretchr/testify/require"

Check failure on line 14 in syncer/atxsync/atxsync_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/spacemeshos/go-spacemesh) (gci)
"go.uber.org/mock/gomock"
)

func atx(id types.ATXID) *types.VerifiedActivationTx {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(1)
atx.SetReceived(time.Now())
copy(atx.SmesherID[:], id[:])
vatx, err := atx.Verify(0, 1)
if err != nil {
panic(err)
}
return vatx
}

func id(id ...byte) types.ATXID {
var atxid types.ATXID
copy(atxid[:], id)
return atxid
}

type fetchRequest struct {
request []types.ATXID
result []*types.VerifiedActivationTx
error error
}

func TestDownload(t *testing.T) {
canceled, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range []struct {
desc string
ctx context.Context
retry time.Duration
existing []*types.VerifiedActivationTx
set []types.ATXID
fetched []fetchRequest
rst error
}{
{
desc: "all existing",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "with multiple requests",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2), id(3)}, result: []*types.VerifiedActivationTx{atx(id(2))}},
{request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}},
},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "continue on error",
ctx: context.Background(),
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
{request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}},
},
set: []types.ATXID{id(1), id(2)},
},
{
desc: "exit on context",
ctx: canceled,
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
},
set: []types.ATXID{id(1), id(2)},
rst: context.Canceled,
},
} {
t.Run(tc.desc, func(t *testing.T) {
logger := logtest.New(t)
db := sql.InMemory()
ctrl := gomock.NewController(t)
fetcher := mocks.NewMockatxFetcher(ctrl)
for _, atx := range tc.existing {
require.NoError(t, atxs.Add(db, atx))
}
for i := range tc.fetched {
req := tc.fetched[i]
fetcher.EXPECT().
GetAtxs(tc.ctx, req.request).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.ATXID) error {
for _, atx := range req.result {
require.NoError(t, atxs.Add(db, atx))
}
return req.error
})
}
require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set))
})
}
}
78 changes: 78 additions & 0 deletions syncer/atxsync/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
Standalone bool
GossipDuration time.Duration
OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"`
DisableAtxSync bool
}

// DefaultConfig for the syncer.
Expand Down Expand Up @@ -443,6 +444,11 @@ func (s *Syncer) syncAtx(ctx context.Context) error {
}
}

if s.cfg.DisableAtxSync {
s.logger.Debug("atx sync disabled")
return nil
}

Check warning on line 450 in syncer/syncer.go

View check run for this annotation

Codecov / codecov/patch

syncer/syncer.go#L448-L450

Added lines #L448 - L450 were not covered by tests

// steady state atx syncing
curr := s.ticker.CurrentLayer()
if float64(
Expand Down

0 comments on commit dc977c9

Please sign in to comment.