Skip to content

Commit

Permalink
Sync malfeasance proofs continuously (#5718)
Browse files Browse the repository at this point in the history
## Motivation

Need to sync malfeasance proofs continuously to facilitate distributed verification. See #5306
  • Loading branch information
ivan4th committed Mar 22, 2024
1 parent 3649c2d commit ea8fd8c
Show file tree
Hide file tree
Showing 23 changed files with 1,461 additions and 257 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions.
* [#5562](https://github.com/spacemeshos/go-spacemesh/pull/5562) Add streaming mode for fetcher. This should lessen
GC pressure during sync

* [#5718](https://github.com/spacemeshos/go-spacemesh/pull/5718) Sync malfeasance proofs continuously.

## Release v1.4.0

### Upgrade information
Expand Down
8 changes: 6 additions & 2 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/atxsync"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
"github.com/spacemeshos/go-spacemesh/sql/localsql/nipost"
"github.com/spacemeshos/go-spacemesh/sql/malsync"
"github.com/spacemeshos/go-spacemesh/sql/poets"
"github.com/spacemeshos/go-spacemesh/sql/recovery"
)
Expand Down Expand Up @@ -116,9 +117,12 @@ func Recover(
return nil, fmt.Errorf("open old local database: %w", err)
}
defer localDB.Close()
logger.With().Info("clearing atx sync metadata from local database")
logger.With().Info("clearing atx and malfeasance sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx *sql.Tx) error {
return atxsync.Clear(tx)
if err := atxsync.Clear(tx); err != nil {
return err
}
return malsync.Clear(tx)
}); err != nil {
return nil, fmt.Errorf("clear atxsync: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config"
"github.com/spacemeshos/go-spacemesh/tortoise"
)
Expand Down Expand Up @@ -191,6 +192,7 @@ func MainnetConfig() Config {
OutOfSyncThresholdLayers: 36, // 3h
DisableMeshAgreement: true,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func fastnet() config.Config {
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.Sync.AtxSync.EpochInfoInterval = 20 * time.Second
conf.Sync.MalSync.IDRequestInterval = 20 * time.Second
conf.LayersPerEpoch = 4
conf.RegossipAtxInterval = 30 * time.Second
conf.FETCH.RequestTimeout = 2 * time.Second
Expand Down
21 changes: 12 additions & 9 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,27 +226,30 @@ func (f *Fetch) GetPoetProof(ctx context.Context, id types.Hash32) error {
}
}

func (f *Fetch) GetMaliciousIDs(ctx context.Context, peer p2p.Peer) ([]byte, error) {
func (f *Fetch) GetMaliciousIDs(ctx context.Context, peer p2p.Peer) ([]types.NodeID, error) {
var malIDs MaliciousIDs
if f.cfg.Streaming {
var b []byte
if err := f.meteredStreamRequest(
ctx, malProtocol, peer, []byte{},
func(ctx context.Context, s io.ReadWriter) (int, error) {
return server.ReadResponse(s, func(respLen uint32) (n int, err error) {
b = make([]byte, respLen)
if _, err := io.ReadFull(s, b); err != nil {
return 0, err
}
return int(respLen), nil
return codec.DecodeFrom(s, &malIDs)
})
},
); err != nil {
return nil, err
}
return b, nil
} else {
return f.meteredRequest(ctx, malProtocol, peer, []byte{})
data, err := f.meteredRequest(ctx, malProtocol, peer, []byte{})
if err != nil {
return nil, err
}
if err := codec.Decode(data, &malIDs); err != nil {
return nil, err
}
}
f.RegisterPeerHashes(peer, types.NodeIDsToHashes(malIDs.NodeIDs))
return malIDs.NodeIDs, nil
}

// GetLayerData get layer data from peers.
Expand Down
16 changes: 8 additions & 8 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,13 @@ func startTestLoop(t *testing.T, f *Fetch, eg *errgroup.Group, stop chan struct{
})
}

func generateMaliciousIDs(t *testing.T) []byte {
func generateMaliciousIDs(t *testing.T) []types.NodeID {
t.Helper()
var malicious MaliciousIDs
for i := 0; i < numMalicious; i++ {
malicious.NodeIDs = append(malicious.NodeIDs, types.RandomNodeID())
malIDs := make([]types.NodeID, numMalicious)
for i := range malIDs {
malIDs[i] = types.RandomNodeID()
}
data, err := codec.Encode(&malicious)
require.NoError(t, err)
return data
return malIDs
}

func generateLayerContent(t *testing.T) []byte {
Expand Down Expand Up @@ -511,7 +509,9 @@ func TestFetch_GetMaliciousIDs(t *testing.T) {
t.Parallel()
f := createFetch(t)
expectedIds := generateMaliciousIDs(t)
f.mMalS.EXPECT().Request(gomock.Any(), p2p.Peer("p0"), []byte{}).Return(expectedIds, nil)
resp := codec.MustEncode(&MaliciousIDs{NodeIDs: expectedIds})
f.mh.EXPECT().ID().Return("self").AnyTimes()
f.mMalS.EXPECT().Request(gomock.Any(), p2p.Peer("p0"), []byte{}).Return(resp, nil)
ids, err := f.GetMaliciousIDs(context.Background(), "p0")
require.NoError(t, err)
require.Equal(t, expectedIds, ids)
Expand Down
6 changes: 2 additions & 4 deletions fetch/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,10 @@ func TestP2PMaliciousIDs(t *testing.T) {
tpf.serverDB.Close()
}

out, err := tpf.clientFetch.GetMaliciousIDs(context.Background(), tpf.serverID)
malIDs, err := tpf.clientFetch.GetMaliciousIDs(context.Background(), tpf.serverID)
if errStr == "" {
require.NoError(t, err)
var got MaliciousIDs
require.NoError(t, codec.Decode(out, &got))
require.ElementsMatch(t, bad, got.NodeIDs)
require.ElementsMatch(t, bad, malIDs)
} else {
require.ErrorContains(t, err, errStr)
}
Expand Down
9 changes: 9 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
"github.com/spacemeshos/go-spacemesh/system"
"github.com/spacemeshos/go-spacemesh/timesync"
timeCfg "github.com/spacemeshos/go-spacemesh/timesync/config"
Expand Down Expand Up @@ -862,6 +863,9 @@ func (app *App) initServices(ctx context.Context) error {
syncerConf.SyncCertDistance = app.Config.Tortoise.Hdist
syncerConf.Standalone = app.Config.Standalone

if app.Config.P2P.MinPeers < app.Config.Sync.MalSync.MinSyncPeers {
app.Config.Sync.MalSync.MinSyncPeers = max(1, app.Config.P2P.MinPeers)
}
app.syncLogger = app.addLogger(SyncLogger, lg)
newSyncer := syncer.NewSyncer(
app.cachedDB,
Expand All @@ -876,6 +880,11 @@ func (app *App) initServices(ctx context.Context) error {
atxsync.WithConfig(app.Config.Sync.AtxSync),
atxsync.WithLogger(app.syncLogger.Zap()),
),
malsync.New(fetcher, app.db, app.localDB,
malsync.WithConfig(app.Config.Sync.MalSync),
malsync.WithLogger(app.syncLogger.Zap()),
malsync.WithPeerErrMetric(syncer.MalPeerError),
),
syncer.WithConfig(syncerConf),
syncer.WithLogger(app.syncLogger),
)
Expand Down
45 changes: 45 additions & 0 deletions sql/malsync/malsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package malsync

import (
"fmt"
"time"

"github.com/spacemeshos/go-spacemesh/sql"
)

func GetSyncState(db sql.Executor) (time.Time, error) {
var timestamp time.Time
rows, err := db.Exec("select timestamp from malfeasance_sync_state",
nil, func(stmt *sql.Statement) bool {
v := stmt.ColumnInt64(0)
if v > 0 {
timestamp = time.Unix(v, 0)
}
return true
})
if err != nil {
return time.Time{}, fmt.Errorf("error getting malfeasance sync state: %w", err)
} else if rows != 1 {
return time.Time{}, fmt.Errorf("expected malfeasance_sync_state to have 1 row but got %d rows", rows)
}
return timestamp, nil
}

func updateSyncState(db sql.Executor, ts int64) error {
_, err := db.Exec("update malfeasance_sync_state set timestamp = ?1",
func(stmt *sql.Statement) {
stmt.BindInt64(1, ts)
}, nil)
if err != nil {
return fmt.Errorf("error updating malfeasance sync state: %w", err)
}
return nil
}

func UpdateSyncState(db sql.Executor, timestamp time.Time) error {
return updateSyncState(db, timestamp.Unix())
}

func Clear(db sql.Executor) error {
return updateSyncState(db, 0)
}
29 changes: 29 additions & 0 deletions sql/malsync/malsync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package malsync

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/sql/localsql"
)

func TestMalfeasanceSyncState(t *testing.T) {
db := localsql.InMemory()
timestamp, err := GetSyncState(db)
require.NoError(t, err)
require.Equal(t, time.Time{}, timestamp)
ts := time.Now()
for i := 0; i < 3; i++ {
require.NoError(t, UpdateSyncState(db, ts))
timestamp, err = GetSyncState(db)
require.NoError(t, err)
require.Equal(t, ts.Truncate(time.Second), timestamp)
ts = ts.Add(3 * time.Minute)
}
require.NoError(t, Clear(db))
timestamp, err = GetSyncState(db)
require.NoError(t, err)
require.Equal(t, time.Time{}, timestamp)
}
6 changes: 6 additions & 0 deletions sql/migrations/local/0007_malfeasance_sync.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE malfeasance_sync_state
(
timestamp INT NOT NULL
);

INSERT INTO malfeasance_sync_state (timestamp) VALUES (0);
62 changes: 0 additions & 62 deletions syncer/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,68 +61,6 @@ func (e *threadSafeErr) join(err error) {
e.err = errors.Join(e.err, err)
}

// PollMaliciousProofs polls all peers for malicious NodeIDs.
func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error {
peers := d.fetcher.SelectBestShuffled(fetch.RedundantPeers)
logger := d.logger.WithContext(ctx)

maliciousIDs := make(chan fetch.MaliciousIDs, len(peers))
var eg errgroup.Group
fetchErr := threadSafeErr{}
for _, peer := range peers {
peer := peer
eg.Go(func() error {
data, err := d.fetcher.GetMaliciousIDs(ctx, peer)
if err != nil {
malPeerError.Inc()
logger.With().Debug("failed to get malicious IDs", log.Err(err), log.Stringer("peer", peer))
fetchErr.join(err)
return nil
}
var malIDs fetch.MaliciousIDs
if err := codec.Decode(data, &malIDs); err != nil {
logger.With().Debug("failed to decode", log.Err(err))
fetchErr.join(err)
return nil
}
logger.With().Debug("received malicious id from peer", log.Stringer("peer", peer))
maliciousIDs <- malIDs
return nil
})
}
_ = eg.Wait()
close(maliciousIDs)

allIds := make(map[types.NodeID]struct{})
success := false
for ids := range maliciousIDs {
success = true
for _, id := range ids.NodeIDs {
allIds[id] = struct{}{}
}
}
if !success {
return fetchErr.err
}

var idsToFetch []types.NodeID
for nodeID := range allIds {
if exists, err := d.ids.IdentityExists(nodeID); err != nil {
logger.With().Error("failed to check identity", log.Err(err))
continue
} else if !exists {
logger.With().Info("malicious identity does not exist", log.Stringer("identity", nodeID))
continue
}
idsToFetch = append(idsToFetch, nodeID)
}

if err := d.fetcher.GetMalfeasanceProofs(ctx, idsToFetch); err != nil {
return fmt.Errorf("getting malfeasance proofs: %w", err)
}
return nil
}

// PollLayerData polls all peers for data in the specified layer.
func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers ...p2p.Peer) error {
if len(peers) == 0 {
Expand Down
Loading

0 comments on commit ea8fd8c

Please sign in to comment.