Skip to content

Commit

Permalink
spawn a process to download atxs hinted by a server (#5377)
Browse files Browse the repository at this point in the history
related: #5366

this change disables sync reconciliation at the end and start of epoch, normal atx downloading works the way it was working.
instead it will ask peers for atxs that are recorded in the set, shared by the server.
  • Loading branch information
dshulyak committed Dec 21, 2023
1 parent 1f28eae commit c580fba
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 1 deletion.
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func MainnetConfig() Config {
Standalone: false,
GossipDuration: 50 * time.Second,
OutOfSyncThresholdLayers: 36, // 3h
DisableAtxReconciliation: true,
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
15 changes: 15 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/localsql"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
Expand Down Expand Up @@ -1154,6 +1155,20 @@ func (app *App) listenToUpdates(ctx context.Context) {
}
if len(update.Data.ActiveSet) > 0 {
app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)
set := update.Data.ActiveSet
app.eg.Go(func() error {
if err := atxsync.Download(
ctx,
10*time.Second,
app.addLogger(SyncLogger, app.log).Zap(),
app.db,
app.fetcher,
set,
); err != nil {
app.errCh <- err
}
return nil
})
}
}
}
Expand Down
75 changes: 75 additions & 0 deletions syncer/atxsync/atxsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package atxsync

import (
"context"
"math/rand"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./atxsync.go
type atxFetcher interface {
GetAtxs(context.Context, []types.ATXID) error
}

func getMissing(db *sql.Database, set []types.ATXID) ([]types.ATXID, error) {
missing := []types.ATXID{}
for _, atx := range set {
exist, err := atxs.Has(db, atx)
if err != nil {
return nil, err
}
if !exist {
missing = append(missing, atx)
}
}
return missing, nil
}

// Download specified set of atxs from peers in the network.
//
// actual retry interval will be between [retryInterval, 2*retryInterval].
func Download(
ctx context.Context,
retryInterval time.Duration,
logger *zap.Logger,
db *sql.Database,
fetcher atxFetcher,
set []types.ATXID,
) error {
total := len(set)
for {
missing, err := getMissing(db, set)
if err != nil {
return err
}
set = missing
downloaded := total - len(missing)
logger.Info("downloaded atxs",
zap.Int("total", total),
zap.Int("downloaded", downloaded),
zap.Array("missing", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, atx := range missing {
enc.AppendString(atx.ShortString())
}
return nil
})))
if len(missing) == 0 {
return nil
}
if err := fetcher.GetAtxs(ctx, missing); err != nil {
logger.Debug("failed to fetch atxs", zap.Error(err))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryInterval + time.Duration(rand.Int63n(int64(retryInterval)))):
}
}
}
}
126 changes: 126 additions & 0 deletions syncer/atxsync/atxsync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package atxsync

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks"
)

func atx(id types.ATXID) *types.VerifiedActivationTx {
atx := &types.ActivationTx{InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
},
NumUnits: 1,
}}
atx.SetID(id)
atx.SetEffectiveNumUnits(1)
atx.SetReceived(time.Now())
copy(atx.SmesherID[:], id[:])
vatx, err := atx.Verify(0, 1)
if err != nil {
panic(err)
}
return vatx
}

func id(id ...byte) types.ATXID {
return types.BytesToATXID(id)
}

type fetchRequest struct {
request []types.ATXID
result []*types.VerifiedActivationTx
error error
}

func TestDownload(t *testing.T) {
canceled, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range []struct {
desc string
ctx context.Context
retry time.Duration
existing []*types.VerifiedActivationTx
set []types.ATXID
fetched []fetchRequest
rst error
}{
{
desc: "all existing",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1)), atx(id(2)), atx(id(3))},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "with multiple requests",
ctx: context.Background(),
existing: []*types.VerifiedActivationTx{atx(id(1))},
retry: 1,
fetched: []fetchRequest{
{
request: []types.ATXID{id(2), id(3)},
error: errors.New("test"),
result: []*types.VerifiedActivationTx{atx(id(2))},
},
{request: []types.ATXID{id(3)}, result: []*types.VerifiedActivationTx{atx(id(3))}},
},
set: []types.ATXID{id(1), id(2), id(3)},
},
{
desc: "continue on error",
ctx: context.Background(),
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
{request: []types.ATXID{id(2)}, result: []*types.VerifiedActivationTx{atx(id(2))}},
},
set: []types.ATXID{id(1), id(2)},
},
{
desc: "exit on context",
ctx: canceled,
retry: 1,
existing: []*types.VerifiedActivationTx{atx(id(1))},
fetched: []fetchRequest{
{request: []types.ATXID{id(2)}, error: errors.New("test")},
},
set: []types.ATXID{id(1), id(2)},
rst: context.Canceled,
},
} {
t.Run(tc.desc, func(t *testing.T) {
logger := logtest.New(t)
db := sql.InMemory()
ctrl := gomock.NewController(t)
fetcher := mocks.NewMockatxFetcher(ctrl)
for _, atx := range tc.existing {
require.NoError(t, atxs.Add(db, atx))
}
for i := range tc.fetched {
req := tc.fetched[i]
fetcher.EXPECT().
GetAtxs(tc.ctx, req.request).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.ATXID) error {
for _, atx := range req.result {
require.NoError(t, atxs.Add(db, atx))
}
return req.error
})
}
require.Equal(t, tc.rst, Download(tc.ctx, tc.retry, logger.Zap(), db, fetcher, tc.set))
})
}
}
78 changes: 78 additions & 0 deletions syncer/atxsync/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
MaxStaleDuration time.Duration
Standalone bool
GossipDuration time.Duration
DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"`
OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"`
}

Expand Down Expand Up @@ -440,7 +441,10 @@ func (s *Syncer) syncAtx(ctx context.Context) error {
return err
}
}

if s.cfg.DisableAtxReconciliation {
s.logger.Debug("atx sync disabled")
return nil
}
// steady state atx syncing
curr := s.ticker.CurrentLayer()
if float64(
Expand Down

0 comments on commit c580fba

Please sign in to comment.