Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: more concurrency fixups #704

Merged
merged 5 commits into from
Apr 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
Expand All @@ -31,9 +29,6 @@ type Node struct {

DiscPort int // UDP listening port for discovery protocol
TCPPort int // TCP listening port for RLPx

// this must be set/read using atomic load and store.
activeStamp int64
}

func newNode(id NodeID, addr *net.UDPAddr) *Node {
Expand All @@ -50,16 +45,6 @@ func (n *Node) isValid() bool {
return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0
}

func (n *Node) bumpActive() {
stamp := time.Now().Unix()
atomic.StoreInt64(&n.activeStamp, stamp)
}

func (n *Node) active() time.Time {
stamp := atomic.LoadInt64(&n.activeStamp)
return time.Unix(stamp, 0)
}

func (n *Node) addr() *net.UDPAddr {
return &net.UDPAddr{IP: n.IP, Port: n.DiscPort}
}
Expand Down
1 change: 0 additions & 1 deletion p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ outer:
func (b *bucket) bump(n *Node) bool {
for i := range b.entries {
if b.entries[i].ID == n.ID {
n.bumpActive()
// move it to the front
copy(b.entries[1:], b.entries[:i])
b.entries[0] = n
Expand Down
5 changes: 3 additions & 2 deletions p2p/discover/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,12 @@ func (t *udp) loop() {
defer timeout.Stop()

rearmTimeout := func() {
if len(pending) == 0 || nextDeadline == pending[0].deadline {
now := time.Now()
if len(pending) == 0 || now.Before(nextDeadline) {
return
}
nextDeadline = pending[0].deadline
timeout.Reset(nextDeadline.Sub(time.Now()))
timeout.Reset(nextDeadline.Sub(now))
}

for {
Expand Down
2 changes: 1 addition & 1 deletion p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
// returning the handshake read error. If the remote side
// disconnects us early with a valid reason, we should return it
// as the error so it can be tracked elsewhere.
werr := make(chan error)
werr := make(chan error, 1)
go func() { werr <- Send(rw, handshakeMsg, our) }()
rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our)
if err != nil {
Expand Down
30 changes: 8 additions & 22 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"sort"
"sync"
Expand All @@ -20,8 +19,7 @@ const (
baseProtocolLength = uint64(16)
baseProtocolMaxMsgSize = 10 * 1024 * 1024

pingInterval = 15 * time.Second
disconnectGracePeriod = 2 * time.Second
pingInterval = 15 * time.Second
)

const (
Expand Down Expand Up @@ -129,39 +127,27 @@ func (p *Peer) run() DiscReason {
case err := <-readErr:
if r, ok := err.(DiscReason); ok {
reason = r
break
} else {
// Note: We rely on protocols to abort if there is a write
// error. It might be more robust to handle them here as well.
p.DebugDetailf("Read error: %v\n", err)
reason = DiscNetworkError
}
// Note: We rely on protocols to abort if there is a write
// error. It might be more robust to handle them here as well.
p.DebugDetailf("Read error: %v\n", err)
p.conn.Close()
reason = DiscNetworkError
case err := <-p.protoErr:
reason = discReasonForError(err)
case reason = <-p.disc:
}

close(p.closed)
p.politeDisconnect(reason)
p.wg.Wait()
if reason != DiscNetworkError {
p.politeDisconnect(reason)
}
p.Debugf("Disconnected: %v\n", reason)
return reason
}

func (p *Peer) politeDisconnect(reason DiscReason) {
done := make(chan struct{})
go func() {
if reason != DiscNetworkError {
SendItems(p.rw, discMsg, uint(reason))
// Wait for the other side to close the connection.
// Discard any data that they send until then.
io.Copy(ioutil.Discard, p.conn)
close(done)
}()
select {
case <-done:
case <-time.After(disconnectGracePeriod):
}
p.conn.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,11 @@ func (srv *Server) Stop() {
// No new peers can be added at this point because dialLoop and
// listenLoop are down. It is safe to call peerWG.Wait because
// peerWG.Add is not called outside of those loops.
srv.lock.Lock()
for _, peer := range srv.peers {
peer.Disconnect(DiscQuitting)
}
srv.lock.Unlock()
srv.peerWG.Wait()
}

Expand Down