Skip to content

Commit

Permalink
fix connection tracking race
Browse files Browse the repository at this point in the history
Before, we could end up (e.g.):

1. Creating two connections (both sides connect at the same time).
2. Try to test with the first one.
3. The first connection dies.
4. Get a stream reset and think that the other side doesn't support the DHT
protocol.

We tried to fix this by checking for an EOF. Unfortunately, reset streams don't
return EOFs.

This commit also simplifies peer tracking (and saves a bit of memory).

fixes ipfs#99
  • Loading branch information
Stebalien committed Jan 7, 2018
1 parent ceab788 commit 36ae474
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 68 deletions.
4 changes: 1 addition & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type IpfsDHT struct {
strmap map[peer.ID]*messageSender
smlk sync.Mutex

plk sync.Mutex
peers map[peer.ID]*peerTracker
plk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand Down Expand Up @@ -119,7 +118,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
peers: make(map[peer.ID]*peerTracker),

Validator: make(record.Validator),
Selector: make(record.Selector),
Expand Down
105 changes: 40 additions & 65 deletions notif.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package dht

import (
"context"
"io"

inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
mstream "github.com/multiformats/go-multistream"
Expand All @@ -12,15 +9,12 @@ import (
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT

var dhtProtocols = []string{string(ProtocolDHT), string(ProtocolDHTOld)}

func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}

type peerTracker struct {
refcount int
cancel func()
}

func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
Expand All @@ -29,61 +23,51 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default:
}

dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[v.RemotePeer()]
if ok {
conn.refcount++
p := v.RemotePeer()
protos, err := dht.peerstore.SupportsProtocols(p, dhtProtocols...)
if err == nil && len(protos) != 0 {
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == inet.Connected {
dht.Update(dht.Context(), p)
}
return
}

ctx, cancel := context.WithCancel(dht.Context())

nn.peers[v.RemotePeer()] = &peerTracker{
refcount: 1,
cancel: cancel,
}

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
go nn.testConnection(ctx, v)

// Note: Unfortunately, the peerstore may not yet now that this peer is
// a DHT server. So, if it didn't return a positive response above, test
// manually.
go nn.testConnection(v)
}

func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) {
func (nn *netNotifiee) testConnection(v inet.Conn) {
dht := nn.DHT()
for {
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

switch err {
case nil:
s.Close()
dht.plk.Lock()

// Check if canceled under the lock.
if ctx.Err() == nil {
dht.Update(dht.Context(), v.RemotePeer())
}

dht.plk.Unlock()
case io.EOF:
if ctx.Err() == nil {
// Connection died but we may still have *an* open connection (context not canceled) so try again.
continue
}
case context.Canceled:
// Context canceled while connecting.
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
default:
// real error? thats odd
log.Warningf("checking dht client type: %s", err)
}
p := v.RemotePeer()

// Forcibly use *this* connection. Otherwise, if we have two connections, we could:
// 1. Test it twice.
// 2. Have it closed from under us leaving the second (open) connection untested.
s, err := v.NewStream()
if err != nil {
// Connection error
return
}
defer s.Close()

selected, err := mstream.SelectOneOf(dhtProtocols, s)
if err != nil {
// Doesn't support the protocol
return
}
// Remember this choice (makes subsequent negotiations faster)
dht.peerstore.AddProtocols(p, selected)

dht.plk.Lock()
defer dht.plk.Unlock()
// Make sure we're still connected under the lock (race with disconnect)
if dht.host.Network().Connectedness(p) == inet.Connected {
dht.Update(dht.Context(), p)
}
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
Expand All @@ -100,16 +84,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[p]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, p)
conn.cancel()
if dht.host.Network().Connectedness(p) != inet.Connected {
dht.routingTable.Remove(p)
}
}()
Expand Down

0 comments on commit 36ae474

Please sign in to comment.