Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't add unresponsive DHT servers to the Routing Table #820

Merged
merged 18 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 98 additions & 47 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -125,6 +120,14 @@ type IpfsDHT struct {

autoRefresh bool

// timeout for the lookupCheck operation
lookupCheckTimeout time.Duration
// time interval during which we don't try to query the same peer again
lookupCheckInterval time.Duration
// recentlyCheckedPeers contains the peers recently queried with the time at which they were queried
recentlyCheckedPeers map[peer.ID]time.Time
recentlyCheckedPeersLk sync.Mutex

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
Expand All @@ -140,7 +143,7 @@ type IpfsDHT struct {
disableFixLowPeers bool
fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
addPeerToRTChan chan peer.ID
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
Expand Down Expand Up @@ -183,6 +186,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)

dht.autoRefresh = cfg.RoutingTable.AutoRefresh

dht.lookupCheckInterval = cfg.LookupCheckInterval
dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
Expand Down Expand Up @@ -233,7 +237,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(dht.ctx, p)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -294,7 +298,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
addPeerToRTChan: make(chan peer.ID),
refreshFinishedCh: make(chan struct{}),
}

Expand All @@ -321,6 +325,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout
dht.recentlyCheckedPeers = make(map[peer.ID]time.Time)

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
Expand Down Expand Up @@ -352,6 +359,39 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
return dht, nil
}

// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least its own peerid
if err == nil && len(peerids) == 0 {
return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids))
}
return err
}

// peerRecentlyQueried returns true if p has been queried less than dht.lookupCheckInterval ago
func (dht *IpfsDHT) peerRecentlyQueried(p peer.ID) bool {
dht.recentlyCheckedPeersLk.Lock()
defer dht.recentlyCheckedPeersLk.Unlock()

now := time.Now()

// clean recentlyCheckedPeers
for peerid, t := range dht.recentlyCheckedPeers {
// remove peers that have been queried more than lookupCheckInterval ago
if t.Add(dht.lookupCheckInterval).Before(now) {
delete(dht.recentlyCheckedPeers, peerid)
}
}
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved

// if p still in recentlyCheckedPeers, it has been queried less than
// lookupCheckInterval ago
_, ok := dht.recentlyCheckedPeers[p]
return ok
}

func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
Expand All @@ -363,16 +403,11 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
_, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
dht.lookupCheck,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down Expand Up @@ -480,7 +515,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(ctx, p, false)
dht.peerFound(ctx, p)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -591,22 +626,22 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
case p := <-dht.addPeerToRTChan:
if dht.routingTable.Size() == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
Expand All @@ -626,39 +661,55 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
//
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=0
//
// If we connect to a peer and then exchange a query RPC ->
//
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
//
// If we query a peer we already have in our Routing Table ->
//
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
//
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
// peerFound verifies whether the found peer advertises DHT protocols
// and probe it to make sure it answers DHT queries as expected. If
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {

guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved
// if the appropriate bucket is already full, don't try to add the new peer.ID
if !dht.routingTable.UsefulPeer(p) {
return
}

// verify whether the remote peer advertises the right dht protocol
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
if dht.peerRecentlyQueried(p) {
// peer was already queried recently and didn't make it to the bucket
return
}
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved

livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved
return
}
// add peer.ID to recently queried peers
dht.recentlyCheckedPeersLk.Lock()
dht.recentlyCheckedPeers[p] = time.Now()
dht.recentlyCheckedPeersLk.Unlock()
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved

// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
}
}

// validPeerFound signals the routingTable that we've found a peer that
// supports the DHT protocol, and just answered correctly to a DHT FindPeers
func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}

select {
case dht.addPeerToRTChan <- p:
case <-dht.ctx.Done():
return
}
}

Expand Down
8 changes: 6 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func TestBootstrappersReplacable(t *testing.T) {
}
require.Len(t, d.routingTable.ListPeers(), 0)

d.recentlyCheckedPeersLk.Lock()
d.recentlyCheckedPeers = make(map[peer.ID]time.Time)
d.recentlyCheckedPeersLk.Unlock()
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved

// adding d1 & d2 works now because there is space in the Routing Table
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d2.self))
Expand Down Expand Up @@ -191,8 +195,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)
go dht.peerFound(dht.ctx, mPeer)
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
66 changes: 66 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"
"golang.org/x/exp/maps"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -767,6 +768,7 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {

// and because of the above bootstrap, A also discovers E !
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
time.Sleep(100 * time.Millisecond)
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

Expand Down Expand Up @@ -1325,6 +1327,49 @@ func TestClientModeConnect(t *testing.T) {
}
}

func TestInvalidServer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)

// make b advertise all dht server protocols
for _, proto := range a.serverProtocols {
// Hang on every request.
b.host.SetStreamHandler(proto, func(s network.Stream) {
defer s.Reset() // nolint
<-ctx.Done()
})
}

connectNoSync(t, ctx, a, b)

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p})
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
if err != nil {
t.Fatal(err)
}

if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}

if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
if a.routingTable.Find(b.self) != "" {
t.Fatal("DHT clients should not be added to routing tables")
}
if b.routingTable.Find(a.self) == "" {
t.Fatal("DHT server should have been added to the dht client's routing table")
}
}

func TestClientModeFindPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -1519,6 +1564,11 @@ func TestFixLowPeers(t *testing.T) {
mainD.routingTable.RemovePeer(d.self)
}

// remove blacklist of already contacted peers
mainD.recentlyCheckedPeersLk.Lock()
maps.Clear(mainD.recentlyCheckedPeers)
mainD.recentlyCheckedPeersLk.Unlock()

// but we will still get enough peers in the RT because of fix low Peers
waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second)
}
Expand Down Expand Up @@ -1624,6 +1674,15 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) {

connect(t, ctx, dhtA, dhtB)

// clear connection history
dhtA.recentlyCheckedPeersLk.Lock()
maps.Clear(dhtA.recentlyCheckedPeers)
dhtA.recentlyCheckedPeersLk.Unlock()

dhtB.recentlyCheckedPeersLk.Lock()
maps.Clear(dhtB.recentlyCheckedPeers)
dhtB.recentlyCheckedPeersLk.Unlock()

// now assert both have each other in their RT
waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second)

Expand Down Expand Up @@ -2124,6 +2183,13 @@ func TestPreconnectedNodes(t *testing.T) {
require.NoError(t, err)
defer h2.Close()

// clear d2 recent checked peers
d2.recentlyCheckedPeersLk.Lock()
maps.Clear(d2.recentlyCheckedPeers)
d2.recentlyCheckedPeersLk.Unlock()

connect(t, ctx, d1, d2)

// See if it works
peers, err := d2.GetClosestPeers(ctx, "testkey")
require.NoError(t, err)
Expand Down
Loading