Skip to content

Commit

Permalink
syncer: sync malfeasance proof when the node is not atx synced (#3982)
Browse files Browse the repository at this point in the history
## Motivation
<!-- Please mention the issue fixed by this PR or detailed motivation -->
Closes #3920
<!-- `Closes #XXXX, closes #XXXX, ...` links mentioned issues to this PR and automatically closes them when this it's merged -->

## Changes
<!-- Please describe in detail the changes made -->
syncer syncs MalfeasanceProof after ATXs are synced. it has to be synced after ATXs to avoid being spammed by unknown identities.

it is synced as follows.
- poll peers for all the malicious NodeIDs
- if those NodeIDs exists, fetch the malfeasance proofs associated with the NodeID hash in batches

this implementation is sub-optimal (see #3987) and is intended to be a genesis compromise
  • Loading branch information
countvonzero committed Jan 27, 2023
1 parent 13a9c6d commit 13a14b6
Show file tree
Hide file tree
Showing 24 changed files with 785 additions and 77 deletions.
17 changes: 8 additions & 9 deletions cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,12 @@ func (app *App) initServices(
}
return pubsub.ValidationIgnore
}
atxSyncHandler := func(_ context.Context, _ p2p.Peer, _ []byte) pubsub.ValidationResult {
if newSyncer.ListenToATXGossip() {
return pubsub.ValidationAccept
}
return pubsub.ValidationIgnore
}

app.host.Register(pubsub.BeaconWeakCoinProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleWeakCoinProposal))
app.host.Register(pubsub.BeaconProposalProtocol,
Expand All @@ -711,18 +717,11 @@ func (app *App) initServices(
app.host.Register(pubsub.BeaconFollowingVotesProtocol,
pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFollowingVotes))
app.host.Register(pubsub.ProposalProtocol, pubsub.ChainGossipHandler(syncHandler, proposalListener.HandleProposal))
app.host.Register(pubsub.AtxProtocol, pubsub.ChainGossipHandler(
func(_ context.Context, _ p2p.Peer, _ []byte) pubsub.ValidationResult {
if newSyncer.ListenToATXGossip() {
return pubsub.ValidationAccept
}
return pubsub.ValidationIgnore
},
atxHandler.HandleGossipAtx))
app.host.Register(pubsub.AtxProtocol, pubsub.ChainGossipHandler(atxSyncHandler, atxHandler.HandleGossipAtx))
app.host.Register(pubsub.TxProtocol, pubsub.ChainGossipHandler(syncHandler, txHandler.HandleGossipTransaction))
app.host.Register(pubsub.HareProtocol, pubsub.ChainGossipHandler(syncHandler, app.hare.GetHareMsgHandler()))
app.host.Register(pubsub.BlockCertify, pubsub.ChainGossipHandler(syncHandler, app.certifier.HandleCertifyMessage))
app.host.Register(pubsub.MalfeasanceProof, malfeasanceHandler.HandleMalfeasanceProof)
app.host.Register(pubsub.MalfeasanceProof, pubsub.ChainGossipHandler(atxSyncHandler, malfeasanceHandler.HandleMalfeasanceProof))

app.proposalBuilder = proposalBuilder
app.proposalListener = proposalListener
Expand Down
24 changes: 23 additions & 1 deletion common/types/nodeid.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"encoding/hex"

"github.com/spacemeshos/ed25519"
"github.com/spacemeshos/go-scale"

"github.com/spacemeshos/go-spacemesh/log"
)

//go:generate scalegen

// BytesToNodeID is a helper to copy buffer into NodeID struct.
func BytesToNodeID(buf []byte) (id NodeID) {
copy(id[:], buf)
Expand All @@ -30,7 +33,7 @@ var ExtractNodeIDFromSig = func(msg, sig []byte) (NodeID, error) {
type NodeID Hash32

const (
// ATXIDSize in bytes.
// NodeIDSize in bytes.
NodeIDSize = Hash32Length
)

Expand All @@ -55,3 +58,22 @@ func (id NodeID) Field() log.Field { return log.Stringer("node_id", id) }

// EmptyNodeID is a canonical empty NodeID.
var EmptyNodeID NodeID

// EncodeScale implements scale codec interface.
func (id *NodeID) EncodeScale(e *scale.Encoder) (int, error) {
return scale.EncodeByteArray(e, id[:])
}

// DecodeScale implements scale codec interface.
func (id *NodeID) DecodeScale(d *scale.Decoder) (int, error) {
return scale.DecodeByteArray(d, id[:])
}

// NodeIDsToHashes turns a list of NodeID into their Hash32 representation.
func NodeIDsToHashes(ids []NodeID) []Hash32 {
hashes := make([]Hash32, 0, len(ids))
for _, id := range ids {
hashes = append(hashes, Hash32(id))
}
return hashes
}
2 changes: 1 addition & 1 deletion common/types/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func RandomATXID() ATXID {
return ATXID(CalcHash32(b))
}

// RandomATXID generates a random ATXID for testing.
// RandomNodeID generates a random NodeID for testing.
func RandomNodeID() NodeID {
b := make([]byte, NodeIDSize)
_, err := rand.Read(b)
Expand Down
27 changes: 21 additions & 6 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,30 @@ func (db *CachedDB) GetPrevAtx(nodeID types.NodeID) (*types.ActivationTxHeader,
}
}

// IdentityExists returns true if this NodeID has published any ATX.
func (db *CachedDB) IdentityExists(nodeID types.NodeID) (bool, error) {
_, err := atxs.GetLastIDByNodeID(db, nodeID)
if err != nil {
if errors.Is(err, sql.ErrNotFound) {
return false, nil
}
return false, err
}
return true, nil
}

// Hint marks which DB should be queried for a certain provided hash.
type Hint string

// DB hints per DB.
const (
BallotDB Hint = "ballotDB"
BlockDB Hint = "blocksDB"
ProposalDB Hint = "proposalDB"
ATXDB Hint = "ATXDB"
TXDB Hint = "TXDB"
POETDB Hint = "POETDB"
BallotDB Hint = "ballotDB"
BlockDB Hint = "blocksDB"
ProposalDB Hint = "proposalDB"
ATXDB Hint = "ATXDB"
TXDB Hint = "TXDB"
POETDB Hint = "POETDB"
Malfeasance Hint = "malfeasance"
)

// NewBlobStore returns a BlobStore.
Expand Down Expand Up @@ -269,6 +282,8 @@ func (bs *BlobStore) Get(hint Hint, key []byte) ([]byte, error) {
return transactions.GetBlob(bs.DB, key)
case POETDB:
return poets.Get(bs.DB, key)
case Malfeasance:
return identities.GetMalfeasanceBlob(bs.DB, key)
}
return nil, fmt.Errorf("blob store not found %s", hint)
}
Expand Down
64 changes: 63 additions & 1 deletion datastore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package datastore_test

import (
"bytes"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/activation"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand All @@ -22,6 +24,13 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/transactions"
)

func TestMain(m *testing.M) {
types.SetLayersPerEpoch(3)

res := m.Run()
os.Exit(res)
}

func TestMalfeasanceProof_Honest(t *testing.T) {
db := sql.InMemory()
cdb := datastore.NewCachedDB(db, logtest.New(t))
Expand Down Expand Up @@ -137,8 +146,36 @@ func TestMalfeasanceProof_Dishonest(t *testing.T) {
require.Equal(t, 2, cdb.MalfeasanceCacheSize())
}

func TestIdentityExists(t *testing.T) {
cdb := datastore.NewCachedDB(sql.InMemory(), logtest.New(t))

signer, err := signing.NewEdSigner()
require.NoError(t, err)

exists, err := cdb.IdentityExists(signer.NodeID())
require.NoError(t, err)
require.False(t, exists)

atx := &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PubLayerID: types.NewLayerID(22),
Sequence: 11,
},
NumUnits: 11,
},
}
require.NoError(t, activation.SignAndFinalizeAtx(signer, atx))
vAtx, err := atx.Verify(0, 1)
require.NoError(t, err)
require.NoError(t, atxs.Add(cdb, vAtx, time.Now()))

exists, err = cdb.IdentityExists(signer.NodeID())
require.NoError(t, err)
require.True(t, exists)
}

func TestBlobStore_GetATXBlob(t *testing.T) {
types.SetLayersPerEpoch(3)
db := sql.InMemory()
bs := datastore.NewBlobStore(db)

Expand Down Expand Up @@ -295,3 +332,28 @@ func TestBlobStore_GetTXBlob(t *testing.T) {
_, err = bs.Get(datastore.BlockDB, tx.ID.Bytes())
require.ErrorIs(t, err, sql.ErrNotFound)
}

func TestBlobStore_GetMalfeasanceBlob(t *testing.T) {
db := sql.InMemory()
bs := datastore.NewBlobStore(db)

proof := &types.MalfeasanceProof{
Layer: types.NewLayerID(11),
Proof: types.Proof{
Type: types.HareEquivocation,
Data: &types.HareProof{
Messages: [2]types.HareProofMsg{{}, {}},
},
},
}
encoded, err := codec.Encode(proof)
require.NoError(t, err)
nodeID := types.NodeID{1, 2, 3}

_, err = bs.Get(datastore.Malfeasance, nodeID.Bytes())
require.ErrorIs(t, err, sql.ErrNotFound)
require.NoError(t, identities.SetMalicious(db, nodeID, encoded))
got, err := bs.Get(datastore.Malfeasance, nodeID.Bytes())
require.NoError(t, err)
require.Equal(t, encoded, got)
}
10 changes: 10 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
lyrOpnsProtocol = "lp/1"
hashProtocol = "hs/1"
meshHashProtocol = "mh/1"
malProtocol = "ml/1"

cacheSize = 1000
)
Expand Down Expand Up @@ -123,6 +124,13 @@ func WithLogger(log log.Log) Option {
}
}

// WithMalfeasanceHandler configures the malfeasance handler of the fetcher.
func WithMalfeasanceHandler(h malfeasanceHandler) Option {
return func(f *Fetch) {
f.malHandler = h
}
}

// WithATXHandler configures the ATX handler of the fetcher.
func WithATXHandler(h atxHandler) Option {
return func(f *Fetch) {
Expand Down Expand Up @@ -186,6 +194,7 @@ type Fetch struct {

servers map[string]requester
poetHandler poetHandler
malHandler malfeasanceHandler
atxHandler atxHandler
ballotHandler ballotHandler
blockHandler blockHandler
Expand Down Expand Up @@ -238,6 +247,7 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
f.servers[lyrOpnsProtocol] = server.New(host, lyrOpnsProtocol, h.handleLayerOpinionsReq, srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
}
return f
}
Expand Down
6 changes: 6 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
type testFetch struct {
*Fetch
mh *mocks.Mockhost
mMalS *mocks.Mockrequester
mAtxS *mocks.Mockrequester
mLyrS *mocks.Mockrequester
mOpnS *mocks.Mockrequester
mHashS *mocks.Mockrequester
mMHashS *mocks.Mockrequester

mMesh *mocks.MockmeshProvider
mMalH *mocks.MockmalfeasanceHandler
mAtxH *mocks.MockatxHandler
mBallotH *mocks.MockballotHandler
mBlocksH *mocks.MockblockHandler
Expand All @@ -42,11 +44,13 @@ func createFetch(tb testing.TB) *testFetch {
ctrl := gomock.NewController(tb)
tf := &testFetch{
mh: mocks.NewMockhost(ctrl),
mMalS: mocks.NewMockrequester(ctrl),
mAtxS: mocks.NewMockrequester(ctrl),
mLyrS: mocks.NewMockrequester(ctrl),
mOpnS: mocks.NewMockrequester(ctrl),
mHashS: mocks.NewMockrequester(ctrl),
mMHashS: mocks.NewMockrequester(ctrl),
mMalH: mocks.NewMockmalfeasanceHandler(ctrl),
mAtxH: mocks.NewMockatxHandler(ctrl),
mBallotH: mocks.NewMockballotHandler(ctrl),
mBlocksH: mocks.NewMockblockHandler(ctrl),
Expand All @@ -67,13 +71,15 @@ func createFetch(tb testing.TB) *testFetch {
WithContext(context.TODO()),
WithConfig(cfg),
WithLogger(lg),
WithMalfeasanceHandler(tf.mMalH),
WithATXHandler(tf.mAtxH),
WithBallotHandler(tf.mBallotH),
WithBlockHandler(tf.mBlocksH),
WithProposalHandler(tf.mProposalH),
WithTXHandler(tf.mTxH),
WithPoetHandler(tf.mPoetH),
withServers(map[string]requester{
malProtocol: tf.mMalS,
atxProtocol: tf.mAtxS,
lyrDataProtocol: tf.mLyrS,
lyrOpnsProtocol: tf.mOpnS,
Expand Down
19 changes: 19 additions & 0 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/ballots"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/system"
)
Expand All @@ -36,6 +37,24 @@ func newHandler(cdb *datastore.CachedDB, bs *datastore.BlobStore, m meshProvider
}
}

// handleEpochInfoReq returns the ATXs published in the specified epoch.
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte, error) {
nodes, err := identities.GetMalicious(h.cdb)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get malicious IDs", log.Err(err))
return nil, err
}
h.logger.WithContext(ctx).With().Debug("responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
malicious := &MaliciousIDs{
NodeIDs: nodes,
}
data, err := codec.Encode(malicious)
if err != nil {
h.logger.With().Fatal("failed to encode malicious IDs", log.Err(err))
}
return data, nil
}

// handleEpochInfoReq returns the ATXs published in the specified epoch.
func (h *handler) handleEpochInfoReq(ctx context.Context, msg []byte) ([]byte, error) {
epoch := types.EpochID(util.BytesToUint32(msg))
Expand Down
Loading

0 comments on commit 13a14b6

Please sign in to comment.