Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - spawn a process to download atxs hinted by a server #5377

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func MainnetConfig() Config {
Standalone: false,
GossipDuration: 50 * time.Second,
OutOfSyncThresholdLayers: 36, // 3h
DisableAtxReconciliation: true,
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
15 changes: 15 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"github.com/spacemeshos/go-spacemesh/sql/localsql"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
Expand Down Expand Up @@ -1154,6 +1155,20 @@
}
if len(update.Data.ActiveSet) > 0 {
app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)
set := update.Data.ActiveSet
app.eg.Go(func() error {
if err := atxsync.Download(
ctx,
10*time.Second,
app.addLogger(SyncLogger, app.log).Zap(),
app.db,
app.fetcher,
set,
); err != nil {
app.errCh <- err
}
return nil

Check warning on line 1170 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1158-L1170

Added lines #L1158 - L1170 were not covered by tests
})
}
}
}
Expand Down
75 changes: 75 additions & 0 deletions syncer/atxsync/atxsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package atxsync

import (
"context"
"math/rand"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go
type atxFetcher interface {
GetAtxs(context.Context, []types.ATXID) error
}

func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) {
missing := []types.ATXID{}
for _, atx := range set {
exist, err := atxs.Has(db, atx)
if err != nil {
return nil, err
}

Check warning on line 27 in syncer/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

syncer/atxsync/atxsync.go#L26-L27

Added lines #L26 - L27 were not covered by tests
if !exist {
missing = append(missing, atx)
}
}
return missing, nil
}

// Download specified set of atxs from peers in the network.
//
// actual retry interval will be between [retryInterval, 2*retryInterval].
func Download(
ctx context.Context,
retryInterval time.Duration,
logger *zap.Logger,
db *sql.Database,
fetcher atxFetcher,
set []types.ATXID,
) error {
total := len(set)
for {
missing, err := getMissing(db, set)
if err != nil {
return err
}

Check warning on line 51 in syncer/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

syncer/atxsync/atxsync.go#L50-L51

Added lines #L50 - L51 were not covered by tests
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
set = missing
downloaded := total - len(missing)
logger.Info("downloaded atxs",
zap.Int("total", total),
zap.Int("downloaded", downloaded),
zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, atx := range missing {
enc.AppendString(atx.ShortString())
}
return nil

Check warning on line 61 in syncer/atxsync/atxsync.go

View check run for this annotation

Codecov / codecov/patch

syncer/atxsync/atxsync.go#L58-L61

Added lines #L58 - L61 were not covered by tests
})))
if len(missing) == 0 {
return nil
}
if err := fetcher.GetAtxs(ctx, missing); err != nil {
logger.Debug("failed to fetch atxs", zap.Error(err))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))):
}
}
}
}
126 changes: 126 additions & 0 deletions syncer/atxsync/atxsync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package atxsync

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks"
)

func atx(id types.ATXID) *types.VerifiedActivationTx {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(1)
atx.SetReceived(time.Now())
copy(atx.SmesherID[:], id[:])
vatx, err := atx.Verify(0, 1)
if err != nil {
panic(err)
}
return vatx
}

func id(id ...byte) types.ATXID {
return types.BytesToATXID(id)
}

type fetchRequest struct {
request []types.ATXID
result []*types.VerifiedActivationTx
error error
}

func TestDownload(t *testing.T) {
canceled, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range []struct {
desc string
ctx context.Context
retry time.Duration
existing []*types.VerifiedActivationTx
set []types.ATXID
fetched []fetchRequest
rst error
}{
{
desc: "all existing",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "with multiple requests",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1))},
retry: 1,
fetched: []fetchRequest{
{
request: []types.ATXID{id(2), id(3)},
error: errors.New("test"),
result: []*types.VerifiedActivationTx{atx(id(2))},
},
{request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}},
},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "continue on error",
ctx: context.Background(),
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
{request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}},
},
set: []types.ATXID{id(1), id(2)},
},
{
desc: "exit on context",
ctx: canceled,
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
},
set: []types.ATXID{id(1), id(2)},
rst: context.Canceled,
},
} {
t.Run(tc.desc, func(t *testing.T) {
logger := logtest.New(t)
db := sql.InMemory()
ctrl := gomock.NewController(t)
fetcher := mocks.NewMockatxFetcher(ctrl)
for _, atx := range tc.existing {
require.NoError(t, atxs.Add(db, atx))
}
for i := range tc.fetched {
req := tc.fetched[i]
fetcher.EXPECT().
GetAtxs(tc.ctx, req.request).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.ATXID) error {
for _, atx := range req.result {
require.NoError(t, atxs.Add(db, atx))
}
return req.error
})
}
require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set))
})
}
}
78 changes: 78 additions & 0 deletions syncer/atxsync/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MaxStaleDuration time.Duration
Standalone bool
GossipDuration time.Duration
DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"`
OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"`
}

Expand Down Expand Up @@ -440,7 +441,10 @@
return err
}
}

if s.cfg.DisableAtxReconciliation {
s.logger.Debug("atx sync disabled")
return nil
}

Check warning on line 447 in syncer/syncer.go

View check run for this annotation

Codecov / codecov/patch

syncer/syncer.go#L445-L447

Added lines #L445 - L447 were not covered by tests
// steady state atx syncing
curr := s.ticker.CurrentLayer()
if float64(
Expand Down