Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't add unresponsive DHT servers to the Routing Table #820

Merged
merged 18 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 62 additions & 48 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -128,6 +123,9 @@ type IpfsDHT struct {

autoRefresh bool

// timeout for the lookupCheck operation
lookupCheckTimeout time.Duration

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
Expand All @@ -143,7 +141,7 @@ type IpfsDHT struct {
disableFixLowPeers bool
fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
addPeerToRTChan chan peer.ID
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
Expand Down Expand Up @@ -247,7 +245,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(p, false)
dht.peerFound(dht.ctx, p)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -309,7 +307,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
addPeerToRTChan: make(chan peer.ID),
refreshFinishedCh: make(chan struct{}),

enableOptProv: cfg.EnableOptimisticProvide,
Expand Down Expand Up @@ -339,6 +337,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout

// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

Expand Down Expand Up @@ -377,6 +377,18 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
return dht, nil
}

// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least its own peerid
if err == nil && len(peerids) == 0 {
return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids))
}
return err
}

func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
Expand All @@ -388,16 +400,11 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
_, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
dht.lookupCheck,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down Expand Up @@ -505,7 +512,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(p, false)
dht.peerFound(ctx, p)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -616,22 +623,22 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
case p := <-dht.addPeerToRTChan:
if dht.routingTable.Size() == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
Expand All @@ -651,40 +658,47 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
//
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=0
//
// If we connect to a peer and then exchange a query RPC ->
//
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
//
// If we query a peer we already have in our Routing Table ->
//
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
//
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) {

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
// peerFound verifies whether the found peer advertises DHT protocols
// and probe it to make sure it answers DHT queries as expected. If
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
// if the peer is already in the routing table or the appropriate bucket is
// already full, don't try to add the new peer.ID
if dht.routingTable.Find(p) != "" || !dht.routingTable.UsefulPeer(p) {
return
}

// verify whether the remote peer advertises the right dht protocol
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():

livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved
return
}

// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
}
}

// validPeerFound signals the routingTable that we've found a peer that
// supports the DHT protocol, and just answered correctly to a DHT FindPeers
func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}

select {
case dht.addPeerToRTChan <- p:
case <-dht.ctx.Done():
return
}
}

Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(d5.self, true)
d.peerFound(d1.self, true)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
11 changes: 9 additions & 2 deletions dht_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"context"
"net"
"sync/atomic"
"testing"

ic "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -31,12 +32,17 @@ func TestIsRelay(t *testing.T) {
type mockConn struct {
local peer.AddrInfo
remote peer.AddrInfo

isClosed atomic.Bool
}

var _ network.Conn = (*mockConn)(nil)

func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error { return nil }
func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error {
m.isClosed.Store(true)
return nil
}
func (m *mockConn) NewStream(context.Context) (network.Stream, error) { return nil, nil }
func (m *mockConn) GetStreams() []network.Stream { return []network.Stream{} }
func (m *mockConn) Stat() network.ConnStats {
Expand All @@ -50,6 +56,7 @@ func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }
func (m *mockConn) ConnState() network.ConnectionState { return network.ConnectionState{} }
func (m *mockConn) IsClosed() bool { return m.isClosed.Load() }

func TestFilterCaching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
6 changes: 4 additions & 2 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT
dht.peerFound(mPeer, true)
// a peer has queried us, let's add it to RT. A new go routine is required
// because we can't block the stream handler until the remote peer answers
// our query.
go dht.peerFound(dht.ctx, mPeer)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
48 changes: 47 additions & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,8 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
connect(t, ctx, dhtA, dhtD)

// and because of the above bootstrap, A also discovers E !
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 10*time.Second)
time.Sleep(100 * time.Millisecond)
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

Expand Down Expand Up @@ -1327,6 +1328,49 @@ func TestClientModeConnect(t *testing.T) {
}
}

func TestInvalidServer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)

// make b advertise all dht server protocols
for _, proto := range a.serverProtocols {
// Hang on every request.
b.host.SetStreamHandler(proto, func(s network.Stream) {
defer s.Reset() // nolint
<-ctx.Done()
})
}

connectNoSync(t, ctx, a, b)

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p})
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
if err != nil {
t.Fatal(err)
}

if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}

if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
if a.routingTable.Find(b.self) != "" {
t.Fatal("DHT clients should not be added to routing tables")
}
if b.routingTable.Find(a.self) == "" {
t.Fatal("DHT server should have been added to the dht client's routing table")
}
}

func TestClientModeFindPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -2126,6 +2170,8 @@ func TestPreconnectedNodes(t *testing.T) {
require.NoError(t, err)
defer h2.Close()

connect(t, ctx, d1, d2)

// See if it works
peers, err := d2.GetClosestPeers(ctx, "testkey")
require.NoError(t, err)
Expand Down
Loading