diff --git a/go.mod b/go.mod index 71f84b591c..11cfe0e236 100644 --- a/go.mod +++ b/go.mod @@ -20,8 +20,10 @@ require ( github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.3.0 go.etcd.io/bbolt v1.3.3 - golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 - golang.org/x/net v0.0.0-20190620200207-3b0461eec859 + golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 + golang.org/x/net v0.0.0-20190628185345-da137c7871d7 + golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect + golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386 // indirect ) // Uncomment for tests with alternate branches of 'dmsg' diff --git a/go.sum b/go.sum index 3862b75d45..3344ce8736 100644 --- a/go.sum +++ b/go.sum @@ -117,11 +117,13 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 h1:ydJNl0ENAG67pFbB+9tfhiL2pYqLhfoaZFw/cjLhY4A= golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -134,12 +136,15 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190627182818-9947fec5c3ab/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386 h1:W/t3IYUOQPd8DK2ssOWA8sjulHHMxzTgiQkSx0z5sRQ= +golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/pkg/router/router.go b/pkg/router/router.go index 7606ce4cdc..40ee9b1283 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -79,34 +79,38 @@ func New(config *Config) *Router { // Serve starts transport listening loop. func (r *Router) Serve(ctx context.Context) error { - go func() { - for tp := range r.tm.DataTpChan { - r.mu.Lock() - isAccepted := tp.Accepted - r.mu.Unlock() - - r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", isAccepted, false) - - r.handleTransport(tp, isAccepted, false) - } - }() + r.Logger.Info("Starting router") go func() { - for tp := range r.tm.SetupTpChan { - r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", true, true) - r.handleTransport(tp, true, false) - } - }() + for { + select { + case dTp, ok := <-r.tm.DataTpChan: + if !ok { + return + } + r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", dTp.Accepted, false) + r.handleTransport(dTp, dTp.Accepted, false) + initStatus := "locally" + if dTp.Accepted { + initStatus = "remotely" + } + r.Logger.Infof("New %s-initiated transport: purpose(data)", initStatus) + r.handleTransport(dTp, dTp.Accepted, false) - go func() { - for range r.expiryTicker.C { - if err := r.rm.rt.Cleanup(); err != nil { - r.Logger.Warnf("Failed to expiry routes: %s", err) + case sTp, ok := <-r.tm.SetupTpChan: + if !ok { + return + } + r.Logger.Infof("New remotely-initiated transport: purpose(setup)") + r.handleTransport(sTp, true, true) + + case <-r.expiryTicker.C: + if err := r.rm.rt.Cleanup(); err != nil { + r.Logger.Warnf("Failed to expiry routes: %s", err) + } } } }() - - r.Logger.Info("Starting router") return r.tm.Serve(ctx) } @@ -434,7 +438,7 @@ func (r *Router) setupProto(ctx context.Context) (*setup.Protocol, transport.Tra } // TODO(evanlinjin): need string constant for tp type. - tr, err := r.tm.CreateSetupTransport(ctx, r.config.SetupNodes[0], dmsg.Type, false) + tr, err := r.tm.CreateSetupTransport(ctx, r.config.SetupNodes[0], dmsg.Type) if err != nil { return nil, nil, fmt.Errorf("setup transport: %s", err) } diff --git a/pkg/transport/dmsg/config.go b/pkg/transport/dmsg/config.go deleted file mode 100644 index 44dc2a4268..0000000000 --- a/pkg/transport/dmsg/config.go +++ /dev/null @@ -1,17 +0,0 @@ -package dmsg - -import ( - "time" - - "github.com/skycoin/dmsg/cipher" - "github.com/skycoin/dmsg/disc" -) - -// Config configures dmsg -type Config struct { - PubKey cipher.PubKey - SecKey cipher.SecKey - Discovery disc.APIClient - Retries int - RetryDelay time.Duration -} diff --git a/pkg/transport/dmsg/client.go b/pkg/transport/dmsg/dmsg.go similarity index 63% rename from pkg/transport/dmsg/client.go rename to pkg/transport/dmsg/dmsg.go index c40bddb1d0..d9c41404b4 100644 --- a/pkg/transport/dmsg/client.go +++ b/pkg/transport/dmsg/dmsg.go @@ -2,6 +2,8 @@ package dmsg import ( "context" + "net" + "time" "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" @@ -16,14 +18,31 @@ const ( Type = dmsg.Type ) -// Client is a wrapper type for "github.com/skycoin/dmsg".Client -type Client struct { - *dmsg.Client +// Config configures dmsg +type Config struct { + PubKey cipher.PubKey + SecKey cipher.SecKey + Discovery disc.APIClient + Retries int + RetryDelay time.Duration +} + +// Server is an alias for dmsg.Server. +type Server = dmsg.Server + +// NewServer is an alias for dmsg.NewServer. +func NewServer(pk cipher.PubKey, sk cipher.SecKey, addr string, l net.Listener, dc disc.APIClient) (*Server, error) { + return dmsg.NewServer(pk, sk, addr, l, dc) } // ClientOption is a wrapper type for "github.com/skycoin/dmsg".ClientOption type ClientOption = dmsg.ClientOption +// Client is a wrapper type for "github.com/skycoin/dmsg".Client +type Client struct { + *dmsg.Client +} + // NewClient is a wrapper type for "github.com/skycoin/dmsg".NewClient func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, opts ...ClientOption) *Client { return &Client{ @@ -50,22 +69,22 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (transport.Tran return NewTransport(tp), nil } -// Close is a wrapper type for "github.com/skycoin/dmsg".Close -func (c *Client) Close() error { - return c.Client.Close() +// SetLogger is a wrapper type for "github.com/skycoin/dmsg".SetLogger +func SetLogger(log *logging.Logger) ClientOption { + return dmsg.SetLogger(log) } -// Local is a wrapper type for "github.com/skycoin/dmsg".Local -func (c *Client) Local() cipher.PubKey { - return c.Client.Local() +// Transport is a wrapper type for "github.com/skycoin/dmsg".Transport +type Transport struct { + *dmsg.Transport } -// Type is a wrapper type for "github.com/skycoin/dmsg".Type -func (c *Client) Type() string { - return c.Client.Type() +// NewTransport creates a new Transport. +func NewTransport(tp *dmsg.Transport) *Transport { + return &Transport{Transport: tp} } -// SetLogger is a wrapper type for "github.com/skycoin/dmsg".SetLogger -func SetLogger(log *logging.Logger) ClientOption { - return dmsg.SetLogger(log) +// Edges returns sorted edges of transport +func (tp *Transport) Edges() [2]cipher.PubKey { + return transport.SortPubKeys(tp.LocalPK(), tp.RemotePK()) } diff --git a/pkg/transport/dmsg/server.go b/pkg/transport/dmsg/server.go deleted file mode 100644 index cdb1db0575..0000000000 --- a/pkg/transport/dmsg/server.go +++ /dev/null @@ -1,17 +0,0 @@ -package dmsg - -import ( - "net" - - "github.com/skycoin/dmsg" - "github.com/skycoin/dmsg/cipher" - "github.com/skycoin/dmsg/disc" -) - -// Server is an alias for dmsg.Server. -type Server = dmsg.Server - -// NewServer is an alias for dmsg.NewServer. -func NewServer(pk cipher.PubKey, sk cipher.SecKey, addr string, l net.Listener, dc disc.APIClient) (*Server, error) { - return dmsg.NewServer(pk, sk, addr, l, dc) -} diff --git a/pkg/transport/dmsg/transport.go b/pkg/transport/dmsg/transport.go deleted file mode 100644 index cbf90a9ba8..0000000000 --- a/pkg/transport/dmsg/transport.go +++ /dev/null @@ -1,50 +0,0 @@ -package dmsg - -import ( - "time" - - "github.com/skycoin/dmsg" - "github.com/skycoin/dmsg/cipher" - - "github.com/skycoin/skywire/pkg/transport" -) - -// Transport is a wrapper type for "github.com/skycoin/dmsg".Transport -type Transport struct { - *dmsg.Transport -} - -// NewTransport creates a new Transport. -func NewTransport(tp *dmsg.Transport) *Transport { - return &Transport{Transport: tp} -} - -// Read is a wrapper for "github.com/skycoin/dmsg".(*Transport).Read -func (tp *Transport) Read(p []byte) (n int, err error) { - return tp.Transport.Read(p) -} - -// Write is a wrapper for "github.com/skycoin/dmsg".(*Transport).Write -func (tp *Transport) Write(p []byte) (n int, err error) { - return tp.Transport.Write(p) -} - -// Close is a wrapper for "github.com/skycoin/dmsg".(*Transport).Close -func (tp *Transport) Close() error { - return tp.Transport.Close() -} - -// Edges returns sorted edges of transport -func (tp *Transport) Edges() [2]cipher.PubKey { - return transport.SortPubKeys(tp.LocalPK(), tp.RemotePK()) -} - -// SetDeadline is a wrapper for "github.com/skycoin/dmsg".(*Transport).SetDeadline -func (tp *Transport) SetDeadline(t time.Time) error { - return tp.Transport.SetDeadline(t) -} - -// Type is a wrapper for "github.com/skycoin/dmsg".(*Transport).Type -func (tp *Transport) Type() string { - return tp.Transport.Type() -} diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index a2178cad11..1d4da4e8fa 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -180,7 +180,7 @@ func (tm *Manager) createDefaultTransports(ctx context.Context) { if exist { continue } - _, err := tm.CreateDataTransport(ctx, pk, "messaging", true) + _, err := tm.CreateDataTransport(ctx, pk, "dmsg", true) if err != nil { tm.Logger.Warnf("Failed to establish transport to a node %s: %s", pk, err) } @@ -225,36 +225,23 @@ func (tm *Manager) Serve(ctx context.Context) error { } // CreateSetupTransport begins to attempt to establish setup transports to the given 'remote' node. -func (tm *Manager) CreateSetupTransport(ctx context.Context, remote cipher.PubKey, tpType string, public bool) (Transport, error) { - factory := tm.factories[tpType] - if factory == nil { +func (tm *Manager) CreateSetupTransport(ctx context.Context, remote cipher.PubKey, tpType string) (Transport, error) { + factory, ok := tm.factories[tpType] + if !ok { return nil, errors.New("unknown transport type") } - - tr, entry, err := tm.dialTransport(ctx, factory, remote, public) + tr, err := factory.Dial(ctx, remote) if err != nil { return nil, err } - - oldTr := tm.Transport(entry.ID) - if oldTr != nil { - oldTr.killWorker() - } - - tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID) - - select { - case <-tm.doneChan: - return nil, io.ErrClosedPipe - case tm.SetupTpChan <- tr: - return tr, nil - } + tm.Logger.Infof("Dialed to setup node %s using %s factory.", remote, tpType) + return tr, nil } // CreateDataTransport begins to attempt to establish data transports to the given 'remote' node. func (tm *Manager) CreateDataTransport(ctx context.Context, remote cipher.PubKey, tpType string, public bool) (*ManagedTransport, error) { - factory := tm.factories[tpType] - if factory == nil { + factory, ok := tm.factories[tpType] + if !ok { return nil, errors.New("unknown transport type") } @@ -347,25 +334,23 @@ func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote ci if tm.isClosing() { return nil, nil, errors.New("transport.Manager is closing. Skipping dialing transport") } + if tm.IsSetupPK(remote) { + return nil, nil, errors.New("cannot dial to setup node") + } tr, err := factory.Dial(ctx, remote) if err != nil { return nil, nil, err } - var entry *Entry - if tm.IsSetupTransport(tr) { - entry = makeEntry(tr, public) - } else { - entry, err = settlementInitiatorHandshake(public).Do(tm, tr, time.Minute) - if err != nil { - go func() { - if err := tr.Close(); err != nil { - tm.Logger.Warnf("Failed to close transport: %s", err) - } - }() - return nil, nil, err - } + entry, err := settlementInitiatorHandshake(public).Do(tm, tr, time.Minute) + if err != nil { + go func() { + if err := tr.Close(); err != nil { + tm.Logger.Warnf("Failed to close transport: %s", err) + } + }() + return nil, nil, err } return tr, entry, nil @@ -381,35 +366,12 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (Transp return nil, errors.New("transport.Manager is closing. Skipping incoming transport") } - var entry *Entry - isSetup := tm.IsSetupTransport(tr) - if isSetup { - entry = makeEntry(tr, false) - } else { - entry, err = settlementResponderHandshake().Do(tm, tr, 30*time.Second) - if err != nil { - go func() { - if err = tr.Close(); err != nil { - tm.Logger.Warnf("Failed to close transport: %s", err) - } - }() - return nil, err - } - } - - remote, ok := tm.Remote(tr.Edges()) + remotePK, ok := tm.Remote(tr.Edges()) if !ok { - return nil, errors.New("remote pubkey not found in edges") - } - - tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) - - oldTr := tm.Transport(entry.ID) - if oldTr != nil { - oldTr.killWorker() + return nil, errors.New("failed to determine remote edge of accepted transport") } - if isSetup { + if isSetup := tm.IsSetupPK(remotePK); isSetup { select { case <-tm.doneChan: return nil, io.ErrClosedPipe @@ -418,6 +380,22 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (Transp } } + entry, err := settlementResponderHandshake().Do(tm, tr, 30*time.Second) + if err != nil { + go func() { + if err = tr.Close(); err != nil { + tm.Logger.Warnf("Failed to close transport: %s", err) + } + }() + return nil, err + } + + tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remotePK, entry.ID) + + if oldTr := tm.Transport(entry.ID); oldTr != nil { + oldTr.killWorker() + } + mTr := newManagedTransport(tr, *entry, true) tm.mu.Lock() @@ -428,7 +406,7 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (Transp case <-tm.doneChan: return nil, io.ErrClosedPipe case tm.DataTpChan <- mTr: - go tm.manageTransport(ctx, mTr, factory, remote) + go tm.manageTransport(ctx, mTr, factory, remotePK) return mTr, nil } } @@ -526,13 +504,11 @@ func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, f } // IsSetupTransport checks whether `tr` is running in the `setup` mode. -func (tm *Manager) IsSetupTransport(tr Transport) bool { - for _, pk := range tm.setupNodes { - remote, ok := tm.Remote(tr.Edges()) - if ok && (remote == pk) { +func (tm *Manager) IsSetupPK(pk cipher.PubKey) bool { + for _, sPK := range tm.setupNodes { + if sPK == pk { return true } } - return false }