Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Probabilistically track in-flight IWANT requests and apply penalty for broken promises #324

Merged
merged 6 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions gossip_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package pubsub

import (
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize
// peers who don't follow up on IWANT requests after an IHAVE advertisement.
// The tracking of promises is probabilistic to avoid using too much memory.
type gossipTracer struct {
sync.Mutex
msgID MsgIdFunction
promises map[string]map[peer.ID]time.Time
}

func newGossipTracer() *gossipTracer {
return &gossipTracer{
msgID: DefaultMsgIdFn,
promises: make(map[string]map[peer.ID]time.Time),
}
}

func (gt *gossipTracer) Start(gs *GossipSubRouter) {
if gt == nil {
return
}

gt.msgID = gs.p.msgID
}

// track a promise to deliver a message from a list of msgIDs we are requesting
func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {
if gt == nil {
return
}

idx := rand.Intn(len(msgIDs))
mid := msgIDs[idx]

gt.Lock()
defer gt.Unlock()

peers, ok := gt.promises[mid]
if !ok {
peers = make(map[peer.ID]time.Time)
gt.promises[mid] = peers
}

_, ok = peers[p]
if !ok {
peers[p] = time.Now().Add(GossipSubIWantFollowupTime)
}
}

// returns the number of broken promises for each peer who didn't follow up
// on an IWANT request.
func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
if gt == nil {
return nil
}

gt.Lock()
defer gt.Unlock()

var res map[peer.ID]int
now := time.Now()

for mid, peers := range gt.promises {
for p, expire := range peers {
if expire.Before(now) {
if res == nil {
res = make(map[peer.ID]int)
}
res[p]++

delete(peers, p)
}
}
if len(peers) == 0 {
delete(gt.promises, mid)
}
}

return res
}

var _ internalTracer = (*gossipTracer)(nil)

func (gt *gossipTracer) DeliverMessage(msg *Message) {
// someone delivered a message, stop tracking promises for it
mid := gt.msgID(msg.Message)

gt.Lock()
defer gt.Unlock()

delete(gt.promises, mid)
}

func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
// A message got rejected, so we can stop tracking promises and let the score penalty apply
// from invalid message delivery.
// We do take exception and apply promise penalty regardless in the following cases, where
// the peer delivered an obviously invalid message.
switch reason {
case rejectMissingSignature:
return
case rejectInvalidSignature:
return
case rejectSelfOrigin:
return
}

mid := gt.msgID(msg.Message)

gt.Lock()
defer gt.Unlock()

delete(gt.promises, mid)
}

func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) ValidateMessage(msg *Message) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
101 changes: 101 additions & 0 deletions gossip_tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package pubsub

import (
"testing"
"time"

pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
)

func TestBrokenPromises(t *testing.T) {
// tests that unfullfilled promises are tracked correctly
originalGossipSubIWantFollowupTime := GossipSubIWantFollowupTime
GossipSubIWantFollowupTime = 100 * time.Millisecond
defer func() {
GossipSubIWantFollowupTime = originalGossipSubIWantFollowupTime
}()

gt := newGossipTracer()

peerA := peer.ID("A")
peerB := peer.ID("B")

var msgs []*pb.Message
var mids []string
for i := 0; i < 100; i++ {
m := makeTestMessage(i)
m.From = []byte(peerA)
msgs = append(msgs, m)
mid := DefaultMsgIdFn(m)
mids = append(mids, mid)
}

gt.AddPromise(peerA, mids)
gt.AddPromise(peerB, mids)

// no broken promises yet
brokenPromises := gt.GetBrokenPromises()
if brokenPromises != nil {
t.Fatal("expected no broken promises")
}

// make promises break
time.Sleep(GossipSubIWantFollowupTime + 10*time.Millisecond)

brokenPromises = gt.GetBrokenPromises()
if len(brokenPromises) != 2 {
t.Fatalf("expected 2 broken prmises, got %d", len(brokenPromises))
}

brokenPromisesA := brokenPromises[peerA]
if brokenPromisesA != 1 {
t.Fatalf("expected 1 broken promise from A, got %d", brokenPromisesA)
}

brokenPromisesB := brokenPromises[peerB]
if brokenPromisesB != 1 {
t.Fatalf("expected 1 broken promise from A, got %d", brokenPromisesB)
}
}

func TestNoBrokenPromises(t *testing.T) {
// like above, but this time we deliver messages to fullfil the promises
originalGossipSubIWantFollowupTime := GossipSubIWantFollowupTime
GossipSubIWantFollowupTime = 100 * time.Millisecond
defer func() {
GossipSubIWantFollowupTime = originalGossipSubIWantFollowupTime
}()

gt := newGossipTracer()

peerA := peer.ID("A")
peerB := peer.ID("B")

var msgs []*pb.Message
var mids []string
for i := 0; i < 100; i++ {
m := makeTestMessage(i)
m.From = []byte(peerA)
msgs = append(msgs, m)
mid := DefaultMsgIdFn(m)
mids = append(mids, mid)
}

gt.AddPromise(peerA, mids)
gt.AddPromise(peerB, mids)

for _, m := range msgs {
gt.DeliverMessage(&Message{Message: m})
}

time.Sleep(GossipSubIWantFollowupTime + 10*time.Millisecond)

// there should be no broken promises
brokenPromises := gt.GetBrokenPromises()
if brokenPromises != nil {
t.Fatal("expected no broken promises")
}

}
38 changes: 33 additions & 5 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ var (

// Maximum number of IHAVE messages to accept from a peer within a heartbeat.
GossipSubMaxIHaveMessages = 10

// Time to wait for a message requested through IWANT following an IHAVE advertisement.
// If the message is not received within this window, a broken promise is declared and
// the router may apply bahavioural penalties.
GossipSubIWantFollowupTime = 3 * time.Second
)

// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
Expand Down Expand Up @@ -147,11 +152,17 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt
gs.acceptPXThreshold = thresholds.AcceptPXThreshold
gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold

gs.gossipTracer = newGossipTracer()

// hook the tracer
if ps.tracer != nil {
ps.tracer.score = gs.score
ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer)
} else {
ps.tracer = &pubsubTracer{score: gs.score, pid: ps.host.ID(), msgID: ps.msgID}
ps.tracer = &pubsubTracer{
internal: []internalTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
}
}

return nil
Expand Down Expand Up @@ -234,9 +245,11 @@ type GossipSubRouter struct {
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
mcache *MessageCache
tracer *pubsubTracer
score *peerScore

mcache *MessageCache
tracer *pubsubTracer
score *peerScore
gossipTracer *gossipTracer

// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes.
Expand Down Expand Up @@ -298,6 +311,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
// start the scoring
gs.score.Start(gs)

// and the gossip tracing
gs.gossipTracer.Start(gs)

// start using the same msg ID function as PubSub for caching messages.
gs.mcache.SetMsgIdFn(p.msgID)

Expand Down Expand Up @@ -460,6 +476,8 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
iwantlst = iwantlst[:iask]
gs.iasked[p] += iask

gs.gossipTracer.AddPromise(p, iwantlst)

return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
}

Expand Down Expand Up @@ -1091,6 +1109,9 @@ func (gs *GossipSubRouter) heartbeat() {
// clean up iasked counters
gs.clearIHaveCounters()

// apply IWANT request penalties
gs.applyIwantPenalties()

// ensure direct peers are connected
gs.directConnect()

Expand Down Expand Up @@ -1273,6 +1294,13 @@ func (gs *GossipSubRouter) clearIHaveCounters() {
}
}

func (gs *GossipSubRouter) applyIwantPenalties() {
for p, count := range gs.gossipTracer.GetBrokenPromises() {
log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count)
gs.score.AddPenalty(p, count)
}
}

func (gs *GossipSubRouter) clearBackoff() {
// we only clear once every 15 ticks to avoid iterating over the map(s) too much
if gs.heartbeatTicks%15 != 0 {
Expand Down
27 changes: 26 additions & 1 deletion gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,20 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
attacker := hosts[1]

// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit)
ps, err := NewGossipSub(ctx, legit,
WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(peer.ID) float64 { return 0 },
BehaviourPenaltyWeight: -1,
BehaviourPenaltyDecay: ScoreParameterDecay(time.Minute),
DecayInterval: DefaultDecayInterval,
DecayToZero: DefaultDecayToZero,
},
&PeerScoreThresholds{
GossipThreshold: -100,
PublishThreshold: -500,
GraylistThreshold: -1000,
}))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -200,6 +213,12 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
}
firstBatchCount := iwc

// the score should still be 0 because we haven't broken any promises yet
score := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score != 0 {
t.Fatalf("Expected 0 score, but got %f", score)
}

// Wait for a hearbeat
time.Sleep(GossipSubHeartbeatInterval)

Expand All @@ -222,6 +241,12 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
if iwc-firstBatchCount > GossipSubMaxIHaveLength {
t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
}

// The score should now be negative because of broken promises
score = ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score >= 0 {
t.Fatalf("Expected negative score, but got %f", score)
}
}()
}
}
Expand Down
2 changes: 1 addition & 1 deletion score.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type peerScore struct {
inspectPeriod time.Duration
}

var _ scoreTracer = (*peerScore)(nil)
var _ internalTracer = (*peerScore)(nil)

type messageDeliveries struct {
records map[string]*deliveryRecord
Expand Down
3 changes: 2 additions & 1 deletion score_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ type PeerScoreParams struct {
// P7: behavioural pattern penalties.
// This parameter has an associated counter which tracks misbehaviour as detected by the
// router. The router currently applies penalties for the following behaviors:
// - attempting to re-graft before the prune backoff time has elapsed
// - attempting to re-graft before the prune backoff time has elapsed.
// - not following up in IWANT requests for messages advertised with IHAVE.
//
// The value of the parameter is the square of the counter, which decays with BehaviourPenaltyDecay.
// The weight of the parameter MUST be negative (or zero to disable).
Expand Down
Loading