Skip to content

Commit

Permalink
Remove ipni relay
Browse files Browse the repository at this point in the history
  • Loading branch information
ribasushi committed Feb 25, 2024
1 parent e810105 commit d4ab8db
Show file tree
Hide file tree
Showing 7 changed files with 1 addition and 293 deletions.
168 changes: 0 additions & 168 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package sub

import (
"bytes"
"context"
"encoding/binary"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
bserv "github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipni/go-libipni/announce/message"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -27,11 +23,9 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/unixfs"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/full"
)

var log = logging.Logger("sub")
Expand Down Expand Up @@ -456,165 +450,3 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType
)
stats.Record(ctx, metric.M(1))
}

type peerMsgInfo struct {
peerID peer.ID
lastCid cid.Cid
lastSeqno uint64
rateLimit *ratelimit.Window
mutex sync.Mutex
}

type IndexerMessageValidator struct {
self peer.ID

peerCache *lru.TwoQueueCache[address.Address, *peerMsgInfo]
chainApi full.ChainModuleAPI
stateApi full.StateModuleAPI
}

func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator {
peerCache, _ := lru.New2Q[address.Address, *peerMsgInfo](8192)

return &IndexerMessageValidator{
self: self,
peerCache: peerCache,
chainApi: chainApi,
stateApi: stateApi,
}
}

func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// This chain-node should not be publishing its own messages. These are
// relayed from market-nodes. If a node appears to be local, reject it.
if pid == v.self {
log.Debug("ignoring indexer message from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}
originPeer := msg.GetFrom()
if originPeer == v.self {
log.Debug("ignoring indexer message originating from self")
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationIgnore
}

idxrMsg := message.Message{}
err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data))
if err != nil {
log.Errorw("Could not decode indexer pubsub message", "err", err)
return pubsub.ValidationReject
}
if len(idxrMsg.ExtraData) == 0 {
log.Debugw("ignoring messsage missing miner id", "peer", originPeer)
return pubsub.ValidationIgnore
}

// Get miner info from lotus
minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData)
if err != nil {
log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData)
return pubsub.ValidationReject
}

msgCid := idxrMsg.Cid

var msgInfo *peerMsgInfo
msgInfo, ok := v.peerCache.Get(minerAddr)
if !ok {
msgInfo = &peerMsgInfo{}
}

// Lock this peer's message info.
msgInfo.mutex.Lock()
defer msgInfo.mutex.Unlock()

if ok {
// Reject replayed messages.
seqno := binary.BigEndian.Uint64(msg.Message.GetSeqno())
if seqno <= msgInfo.lastSeqno {
log.Debugf("ignoring replayed indexer message")
return pubsub.ValidationIgnore
}
msgInfo.lastSeqno = seqno
}

if !ok || originPeer != msgInfo.peerID {
// Check that the miner ID maps to the peer that sent the message.
err = v.authenticateMessage(ctx, minerAddr, originPeer)
if err != nil {
log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerAddr)
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
return pubsub.ValidationReject
}
msgInfo.peerID = originPeer
if !ok {
// Add msgInfo to cache only after being authenticated. If two
// messages from the same peer are handled concurrently, there is a
// small chance that one msgInfo could replace the other here when
// the info is first cached. This is OK, so no need to prevent it.
v.peerCache.Add(minerAddr, msgInfo)
}
}

// See if message needs to be ignored due to rate limiting.
if v.rateLimitPeer(msgInfo, msgCid) {
return pubsub.ValidationIgnore
}

stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}

func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid.Cid) bool {
const (
msgLimit = 5
msgTimeLimit = 10 * time.Second
repeatTimeLimit = 2 * time.Hour
)

timeWindow := msgInfo.rateLimit

// Check overall message rate.
if timeWindow == nil {
timeWindow = ratelimit.NewWindow(msgLimit, msgTimeLimit)
msgInfo.rateLimit = timeWindow
} else if msgInfo.lastCid == msgCid {
// Check if this is a repeat of the previous message data.
if time.Since(timeWindow.Newest()) < repeatTimeLimit {
log.Warnw("ignoring repeated indexer message", "sender", msgInfo.peerID)
return true
}
}

err := timeWindow.Add()
if err != nil {
log.Warnw("ignoring indexer message", "sender", msgInfo.peerID, "err", err)
return true
}

msgInfo.lastCid = msgCid

return false
}

func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerAddress address.Address, peerID peer.ID) error {
ts, err := v.chainApi.ChainHead(ctx)
if err != nil {
return err
}

minerInfo, err := v.stateApi.StateMinerInfo(ctx, minerAddress, ts.Key())
if err != nil {
return err
}

if minerInfo.PeerId == nil {
return xerrors.New("no peer id for miner")
}
if *minerInfo.PeerId != peerID {
return xerrors.New("miner id does not map to peer that sent message")
}

return nil
}
69 changes: 0 additions & 69 deletions chain/sub/incoming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,14 @@
package sub

import (
"bytes"
"context"
"testing"

"github.com/golang/mock/gomock"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipni/go-libipni/announce/message"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/filecoin-project/go-address"

"github.com/filecoin-project/lotus/api/mocks"
"github.com/filecoin-project/lotus/chain/types"
)

Expand Down Expand Up @@ -72,65 +65,3 @@ func TestFetchCidsWithDedup(t *testing.T) {
t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1])
}
}

func TestIndexerMessageValidator_Validate(t *testing.T) {
validCid, err := cid.Decode("QmbpDgg5kRLDgMxS8vPKNFXEcA6D5MC4CkuUdSWDVtHPGK")
if err != nil {
t.Fatal(err)
}
tests := []struct {
name string
selfPID string
senderPID string
extraData []byte
wantValidation pubsub.ValidationResult
}{
{
name: "invalid extra data is rejected",
selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW",
senderPID: "12D3KooWE8yt84RVwW3sFcd6WMjbUdWrZer2YtT4dmtj3dHdahSZ",
extraData: []byte("f0127896"), // note, casting encoded address to byte is invalid.
wantValidation: pubsub.ValidationReject,
},
{
name: "same sender and receiver is ignored",
selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW",
senderPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW",
wantValidation: pubsub.ValidationIgnore,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mc := gomock.NewController(t)
node := mocks.NewMockFullNode(mc)
subject := NewIndexerMessageValidator(peer.ID(tc.selfPID), node, node)
message := message.Message{
Cid: validCid,
Addrs: nil,
ExtraData: tc.extraData,
}
buf := bytes.NewBuffer(nil)
if err := message.MarshalCBOR(buf); err != nil {
t.Fatal(err)
}

topic := "topic"
pbm := &pb.Message{
Data: buf.Bytes(),
Topic: &topic,
From: nil,
Seqno: nil,
}
validate := subject.Validate(context.Background(), peer.ID(tc.senderPID), &pubsub.Message{
Message: pbm,
ReceivedFrom: peer.ID(tc.senderPID),
ValidatorData: nil,
})

if validate != tc.wantValidation {
t.Fatalf("expected %v but got %v", tc.wantValidation, validate)
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ require (
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/ipni/go-libipni v0.0.8
github.com/ipni/index-provider v0.12.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/koalacxr/quantile v0.0.1
Expand Down Expand Up @@ -232,6 +231,7 @@ require (
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-verifcid v0.0.2 // indirect
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect
github.com/ipni/go-libipni v0.0.8 // indirect
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect
Expand Down
2 changes: 0 additions & 2 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ const (
HandleIncomingMessagesKey
HandlePaymentChannelManagerKey

RelayIndexerMessagesKey

// miner
PreflightChecksKey
GetParamsKey
Expand Down
2 changes: 0 additions & 2 deletions node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ var ChainNode = Options(

Override(new(*full.GasPriceCache), full.NewGasPriceCache),

Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages),

// Lite node API
ApplyIf(isLiteNode,
Override(new(messagepool.Provider), messagepool.NewProviderLite),
Expand Down
20 changes: 0 additions & 20 deletions node/modules/lp2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}

ingestTopicParams := &pubsub.TopicScoreParams{
// expected ~0.5 confirmed deals / min. sampled
TopicWeight: 0.1,

TimeInMeshWeight: 0.00027, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,

FirstMessageDeliveriesWeight: 0.5,
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
FirstMessageDeliveriesCap: 100, // allowing for burstiness

InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}

topicParams := map[string]*pubsub.TopicScoreParams{
build.BlocksTopic(in.Nn): {
// expected 10 blocks/min
Expand Down Expand Up @@ -249,9 +233,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
drandTopics = append(drandTopics, topic)
}

// Index ingestion whitelist
topicParams[build.IndexerIngestTopic(in.Nn)] = ingestTopicParams

// IP colocation whitelist
var ipcoloWhitelist []*net.IPNet
for _, cidr := range in.Cfg.IPColocationWhitelist {
Expand Down Expand Up @@ -376,7 +357,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
allowTopics := []string{
build.BlocksTopic(in.Nn),
build.MessagesTopic(in.Nn),
build.IndexerIngestTopic(in.Nn),
}
allowTopics = append(allowTopics, drandTopics...)
options = append(options,
Expand Down
Loading

0 comments on commit d4ab8db

Please sign in to comment.