Skip to content

Commit

Permalink
p2p: simplify listener startup
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed Sep 6, 2018
1 parent ec30745 commit 61af105
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 51 deletions.
36 changes: 5 additions & 31 deletions p2p/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"

net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
Expand All @@ -17,7 +16,6 @@ type Listener interface {
ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr

start() error
key() string

// Close closes the listener. Does not affect child streams
Expand All @@ -30,20 +28,17 @@ type Listeners struct {
sync.RWMutex

Listeners map[string]Listener
starting map[string]struct{}
}

func newListenersLocal(id peer.ID) *Listeners {
func newListenersLocal() *Listeners {
return &Listeners{
Listeners: map[string]Listener{},
starting: map[string]struct{}{},
}
}

func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
func newListenersP2P(host p2phost.Host) *Listeners {
reg := &Listeners{
Listeners: map[string]Listener{},
starting: map[string]struct{}{},
}

host.SetStreamHandlerMatch("/x/", func(p string) bool {
Expand All @@ -67,31 +62,14 @@ func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {

// Register registers listenerInfo into this registry and starts it
func (r *Listeners) Register(l Listener) error {
r.Lock()
k := l.key()

if _, ok := r.Listeners[k]; ok {
r.Unlock()
return errors.New("listener already registered")
}

r.Listeners[k] = l
r.starting[k] = struct{}{}

r.Unlock()

err := l.start()

r.Lock()
defer r.Unlock()

delete(r.starting, k)

if err != nil {
delete(r.Listeners, k)
return err
if _, ok := r.Listeners[l.key()]; ok {
return errors.New("listener already registered")
}

r.Listeners[l.key()] = l
return nil
}

Expand All @@ -100,10 +78,6 @@ func (r *Listeners) Deregister(k string) (bool, error) {
r.Lock()
defer r.Unlock()

if _, ok := r.starting[k]; ok {
return false, errors.New("listener didn't start yet")
}

_, ok := r.Listeners[k]
delete(r.Listeners, k)
return ok, nil
Expand Down
20 changes: 7 additions & 13 deletions p2p/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type localListener struct {
ctx context.Context

p2p *P2P
id peer.ID

proto protocol.ID
laddr ma.Multiaddr
Expand All @@ -32,13 +31,19 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
ctx: ctx,

p2p: p2p,
id: p2p.identity,

proto: proto,
laddr: bindAddr,
peer: peer,
}

maListener, err := manet.Listen(listener.laddr)
if err != nil {
return nil, err
}

listener.listener = maListener

if err := p2p.ListenersLocal.Register(listener); err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,17 +96,6 @@ func (l *localListener) setupStream(local manet.Conn) {
}

l.p2p.Streams.Register(stream)
stream.startStreaming()
}

func (l *localListener) start() error {
maListener, err := manet.Listen(l.laddr)
if err != nil {
return err
}

l.listener = maListener
return nil
}

func (l *localListener) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost,
peerstore: peerstore,

ListenersLocal: newListenersLocal(identity),
ListenersP2P: newListenersP2P(identity, peerHost),
ListenersLocal: newListenersLocal(),
ListenersP2P: newListenersP2P(peerHost),

Streams: &StreamRegistry{
Streams: map[uint64]*Stream{},
Expand Down
5 changes: 0 additions & 5 deletions p2p/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
return listener, nil
}

func (l *remoteListener) start() error {
return nil
}

func (l *remoteListener) handleStream(remote net.Stream) {
local, err := manet.Dial(l.addr)
if err != nil {
Expand Down Expand Up @@ -71,7 +67,6 @@ func (l *remoteListener) handleStream(remote net.Stream) {
}

l.p2p.Streams.Register(stream)
stream.startStreaming()
}

func (l *remoteListener) Protocol() protocol.ID {
Expand Down
2 changes: 2 additions & 0 deletions p2p/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (r *StreamRegistry) Register(streamInfo *Stream) {
streamInfo.id = r.nextID
r.Streams[r.nextID] = streamInfo
r.nextID++

streamInfo.startStreaming()
}

// Deregister deregisters stream from the registry
Expand Down

0 comments on commit 61af105

Please sign in to comment.